Storm 을 이용한 실시간 데이터 분석

Apache Storm is a free and open source distributed realtime computation system.

데이타 스트림과 실시간 분석

  • 시간을 축으로 하여 계속해서 생성되는 데이터
  • 배치 분석 vs. 실시간 분석
  • 마케팅과 같이 실시간 대응이 중요한 경우에는 데이터 스트림실시간 으로 분석
  • EX> 이벤트 감지를 통한 이상 거래 탐지(Fraud Detection)

Storm 특징

  • Integrates
    • Storm integrates with any queueing system and any database system.
  • Simple API
  • Scalable
    • Storm was benchmarked at processing one million 100 byte messages per second per node on hardware with the following specs:
      • Processor: 2x Intel E5645@2.4Ghz
      • Memory: 24 GB
  • Fault tolerant
  • Guarantees data processing
    • Using Trident, a higher level abstraction over Storm’s basic abstractions, you can achieve exactly-once processing semantics.
  • Use with any language
  • Easy to deploy and operate
  • Free and open source
    • Apache Storm is a free and open source project licensed under the Apache License, Version 2.0

Storm Architecture

  1. Nimbus -> Master Node
  2. Supervisor -> Worker Node

Storm Components

  1. Spout: Spout is the entry point in a storm topology.
  2. Bolt: Bolt contains the actual processing logic.
  3. Topology: Topology is a network of spouts and bolts
  4. Stream: A stream is an unbounded sequence of tuples
  5. Tuple: Tuple is the most basic data structure in storm . Its a named list of values.

Storm Parallelism

  • Node
    • Nimbus나 Supervisor 프로세스가 기동되는 물리적인 서버
  • Worker
    • Supervisor가 있는 노드에서 기동되는 자바 프로세스
  • Executor
    • Worker내에서 수행되는 자바 쓰레드
  • Task
    • Bolt 및 Spout 객체

Example of a running topology

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );

Stream groupings

볼트의 작업간에 스트림을 분할하는 방법을 정의

Storm에는 8 개의 내장 된 스트림 그룹이 있으며 CustomStreamGrouping 인터페이스를 구현하여 사용자 정의 스트림 그룹을 구현할 수 있다.

  • Shuffle grouping: 튜플은 각 볼트가 같은 수의 튜플을 확보 할 수 있도록 볼트 작업에 무작위로 분산
  • Fields grouping: 스트림은 그룹화에 지정된 필드로 분할. 예를 들어 스트림이 “user-id”필드로 그룹화 된 경우 동일한 “사용자 ID”를 가진 튜플은 항상 동일한 작업으로 이동하지만 다른 “사용자 ID”가 있는 튜플은 다른 작업으로 갈 수 있다.
  • All grouping: 모든 볼트 작업에서 스트림이 복제
  • Global grouping: 전체 스트림이 볼트 작업 중 하나만 수행. 특히, ID가 가장 낮은 작업으로 이동
  • Direct grouping: 튜플의 제작자가 소비자의 어떤 작업이이 튜플을 수신 할 것인지를 결정

Storm Example

1. HelloTopology Example

pom.xml

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>1.1.1</version>
  <scope>provided</scope>
</dependency>

HelloSpout.java

public class HelloSpout extends BaseRichSpout {
	private SpoutOutputCollector collector;

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

	public void nextTuple() {
		this.collector.emit(new Values("Hello World!"));
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("say"));
	}
}

HelloBolt.java

public class HelloBolt extends BaseBasicBolt {
	
	private static final long serialVersionUID = 1L;

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String value = tuple.getStringByField("say");
		System.out.println("Tuple value is " + value);
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	}
}

HelloTopology.java

public class HelloTopology {
	public static void main(String args[]) throws Exception {
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("HelloSpout", new HelloSpout(), 2);
		builder.setBolt("HelloBolt", new HelloBolt(), 4).shuffleGrouping("HelloSpout");

		Config conf = new Config();
		conf.setDebug(true);

		// If there are arguments, we are running on a cluster
		if (args != null && args.length > 0) {
			// parallelism hint to set the number of workers
			conf.setNumWorkers(3);
			// submit the topology
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}
		// Otherwise, we are running locally
		else {
			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("HelloTopologyLocal", conf, builder.createTopology());
			Utils.sleep(10000);

			// kill the LearningStormTopology
			cluster.killTopology("HelloTopologyLocal");
			// shutdown the storm test cluster
			cluster.shutdown();
		}
	}
}

2. 빌드 및 배포

$ mvn clean package -DskipTests=true
$ storm jar storm-0.0.1-SNAPSHOT.jar hello.HelloTopology HelloTopology

Kafka-Storm Example

pom.xml

<dependency>
	<groupId>org.apache.storm</groupId>
	<artifactId>storm-kafka</artifactId>
	<version>1.1.1</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>0.10.0.1</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.11</artifactId>
	<version>0.10.0.1</version>
</dependency>

CutLogBolt.java

public class CutLogBolt extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {

		String[] splitArray = input.getString(0).split(";");
		String key = "";
		String doctype = "";

		for (int i = 0; i < splitArray.length; i++) {
			if (splitArray[i].contains("key"))
				key = splitArray[i];
			if (splitArray[i].contains("doctype"))
				doctype = splitArray[i];
		}
		collector.emit(new Values(key, doctype));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("key", "doctype"));
	}
}

ClassifyKeyBolt.java

public class ClassifyKeyBolt extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String[] splitdoctype = input.getStringByField("doctype").split(":");
		String[] splitkey = input.getStringByField("key").split(":");
		if (splitkey.length == 2 && splitdoctype.length == 2) {
			String doctype = splitdoctype[1].trim();
			String key = splitkey[1].trim();
			// System.err.println(key + ":" + doctype);
			collector.emit(new Values(key + ":" + doctype));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("subdoctype"));
	}
}

DoctypeCountBolt.java

public class DoctypeCountBolt extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;

	Map<String, Integer> docMap = new HashMap<String, Integer>();

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {

		String doctype = input.getStringByField("subdoctype");

		Integer count = docMap.get(doctype);
		if (count == null)
			count = 0;

		count++;

		docMap.put(doctype, count);
		System.out.println(docMap);
		collector.emit(new Values(docMap));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("docmap"));
	}
}

MonitorTopology.java

public class MonitorTopology {

	public static void main(String[] args) throws Exception {

		String zkUrl = "demo:2181";
		String brokerUrl = "demo:9092";

		System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl);

		ZkHosts hosts = new ZkHosts(zkUrl);

		SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout", kafkaSpout, 1);
		builder.setBolt("cutbolt", new CutLogBolt(), 2).shuffleGrouping("spout");
		builder.setBolt("classifybolt", new ClassifyKeyBolt(), 2).fieldsGrouping("cutbolt", new Fields("key", "doctype"));
		builder.setBolt("docbolt", new DoctypeCountBolt(), 2).fieldsGrouping("classifybolt", new Fields("subdoctype"));

		Config conf = new Config();
		conf.setDebug(true);

		List<String> nimbus_seeds = new ArrayList<String>();
		nimbus_seeds.add("nimbus url");
		if (args != null && args.length > 0) {
			conf.setNumWorkers(3);
			StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
		} else {
			conf.setNumWorkers(3);
			StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());
		}
	}
}

빌드 및 배포

$ mvn clean package -DskipTests=true
$ storm jar storm-0.0.1-SNAPSHOT.jar monitor.MonitorTopology MonitorTopology

Reference






© 2017. by yeopoong.github.io

Powered by yeopoong