Storm 을 이용한 실시간 데이터 분석
Apache Storm is a free and open source distributed realtime computation system.
데이타 스트림과 실시간 분석
- 시간을 축으로 하여 계속해서 생성되는 데이터
- 배치 분석 vs. 실시간 분석
- 마케팅과 같이 실시간 대응이 중요한 경우에는 데이터 스트림을 실시간 으로 분석
- EX> 이벤트 감지를 통한 이상 거래 탐지(Fraud Detection)
Storm 특징
- Integrates
- Storm integrates with any queueing system and any database system.
- Simple API
- Scalable
- Storm was benchmarked at processing one million 100 byte messages per second per node on hardware with the following specs:
- Processor: 2x Intel E5645@2.4Ghz
- Memory: 24 GB
- Storm was benchmarked at processing one million 100 byte messages per second per node on hardware with the following specs:
- Fault tolerant
- Guarantees data processing
- Using Trident, a higher level abstraction over Storm’s basic abstractions, you can achieve exactly-once processing semantics.
- Use with any language
- Easy to deploy and operate
- Free and open source
- Apache Storm is a free and open source project licensed under the Apache License, Version 2.0
Storm Architecture
- Nimbus -> Master Node
- Supervisor -> Worker Node
Storm Components
- Spout: Spout is the entry point in a storm topology.
- Bolt: Bolt contains the actual processing logic.
- Topology: Topology is a network of spouts and bolts
- Stream: A stream is an unbounded sequence of tuples
- Tuple: Tuple is the most basic data structure in storm . Its a named list of values.
Storm Parallelism
- Node
- Nimbus나 Supervisor 프로세스가 기동되는 물리적인 서버
- Worker
- Supervisor가 있는 노드에서 기동되는 자바 프로세스
- Executor
- Worker내에서 수행되는 자바 쓰레드
- Task
- Bolt 및 Spout 객체
Example of a running topology
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
Stream groupings
볼트의 작업간에 스트림을 분할하는 방법을 정의
Storm에는 8 개의 내장 된 스트림 그룹이 있으며 CustomStreamGrouping 인터페이스를 구현하여 사용자 정의 스트림 그룹을 구현할 수 있다.
- Shuffle grouping: 튜플은 각 볼트가 같은 수의 튜플을 확보 할 수 있도록 볼트 작업에 무작위로 분산
- Fields grouping: 스트림은 그룹화에 지정된 필드로 분할. 예를 들어 스트림이 “user-id”필드로 그룹화 된 경우 동일한 “사용자 ID”를 가진 튜플은 항상 동일한 작업으로 이동하지만 다른 “사용자 ID”가 있는 튜플은 다른 작업으로 갈 수 있다.
- All grouping: 모든 볼트 작업에서 스트림이 복제
- Global grouping: 전체 스트림이 볼트 작업 중 하나만 수행. 특히, ID가 가장 낮은 작업으로 이동
- Direct grouping: 튜플의 제작자가 소비자의 어떤 작업이이 튜플을 수신 할 것인지를 결정
Storm Example
1. HelloTopology Example
pom.xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
HelloSpout.java
public class HelloSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
this.collector.emit(new Values("Hello World!"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("say"));
}
}
HelloBolt.java
public class HelloBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
public void execute(Tuple tuple, BasicOutputCollector collector) {
String value = tuple.getStringByField("say");
System.out.println("Tuple value is " + value);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
HelloTopology.java
public class HelloTopology {
public static void main(String args[]) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("HelloSpout", new HelloSpout(), 2);
builder.setBolt("HelloBolt", new HelloBolt(), 4).shuffleGrouping("HelloSpout");
Config conf = new Config();
conf.setDebug(true);
// If there are arguments, we are running on a cluster
if (args != null && args.length > 0) {
// parallelism hint to set the number of workers
conf.setNumWorkers(3);
// submit the topology
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
// Otherwise, we are running locally
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloTopologyLocal", conf, builder.createTopology());
Utils.sleep(10000);
// kill the LearningStormTopology
cluster.killTopology("HelloTopologyLocal");
// shutdown the storm test cluster
cluster.shutdown();
}
}
}
2. 빌드 및 배포
$ mvn clean package -DskipTests=true
$ storm jar storm-0.0.1-SNAPSHOT.jar hello.HelloTopology HelloTopology
Kafka-Storm Example
pom.xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
CutLogBolt.java
public class CutLogBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String[] splitArray = input.getString(0).split(";");
String key = "";
String doctype = "";
for (int i = 0; i < splitArray.length; i++) {
if (splitArray[i].contains("key"))
key = splitArray[i];
if (splitArray[i].contains("doctype"))
doctype = splitArray[i];
}
collector.emit(new Values(key, doctype));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("key", "doctype"));
}
}
ClassifyKeyBolt.java
public class ClassifyKeyBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String[] splitdoctype = input.getStringByField("doctype").split(":");
String[] splitkey = input.getStringByField("key").split(":");
if (splitkey.length == 2 && splitdoctype.length == 2) {
String doctype = splitdoctype[1].trim();
String key = splitkey[1].trim();
// System.err.println(key + ":" + doctype);
collector.emit(new Values(key + ":" + doctype));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("subdoctype"));
}
}
DoctypeCountBolt.java
public class DoctypeCountBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
Map<String, Integer> docMap = new HashMap<String, Integer>();
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String doctype = input.getStringByField("subdoctype");
Integer count = docMap.get(doctype);
if (count == null)
count = 0;
count++;
docMap.put(doctype, count);
System.out.println(docMap);
collector.emit(new Values(docMap));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("docmap"));
}
}
MonitorTopology.java
public class MonitorTopology {
public static void main(String[] args) throws Exception {
String zkUrl = "demo:2181";
String brokerUrl = "demo:9092";
System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl);
ZkHosts hosts = new ZkHosts(zkUrl);
SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, 1);
builder.setBolt("cutbolt", new CutLogBolt(), 2).shuffleGrouping("spout");
builder.setBolt("classifybolt", new ClassifyKeyBolt(), 2).fieldsGrouping("cutbolt", new Fields("key", "doctype"));
builder.setBolt("docbolt", new DoctypeCountBolt(), 2).fieldsGrouping("classifybolt", new Fields("subdoctype"));
Config conf = new Config();
conf.setDebug(true);
List<String> nimbus_seeds = new ArrayList<String>();
nimbus_seeds.add("nimbus url");
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());
}
}
}
빌드 및 배포
$ mvn clean package -DskipTests=true
$ storm jar storm-0.0.1-SNAPSHOT.jar monitor.MonitorTopology MonitorTopology