Apache Storm - 工作示例

  • 简述

    我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单的场景了。
  • 场景 - 移动通话记录分析器

    移动呼叫及其持续时间将作为 Apache Storm 的输入,Storm 将处理和分组同一呼叫者和接收者之间的呼叫以及它们的呼叫总数。
  • spout 创建

    Spout 是一个用于数据生成的组件。基本上,一个 spout 将实现一个 IRichSpout 接口。“IRichSpout”接口具有以下重要方法 -
    • open− 为 spout 提供执行环境。执行程序将运行此方法来初始化 spout。
    • nextTuple− 通过收集器发出生成的数据。
    • close− 当一个 spout 将要关闭时调用此方法。
    • declareOutputFields- 声明元组的输出模式。
    • ack− 确认处理了特定的元组
    • fail- 指定一个特定的元组不被处理并且不被重新处理。

    open

    open方法如下 -
    
    open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    
    • conf− 为这个 spout 提供风暴配置。
    • context− 提供有关 toplogy内的 spout 位置、其任务 ID、输入和输出信息的完整信息。
    • collector− 使我们能够发出将由bolt 处理的元组。

    nextTuple

    的签名nextTuple方法如下 -
    
    nextTuple()
    
    nextTuple() 从与 ack() 和 fail() 方法相同的循环中定期调用。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。所以 nextTuple 的第一行检查处理是否完成。如果是这样,它应该在返回之前至少休眠一毫秒以减少处理器上的负载。

    close

    close方法如下 -
    
    close()
    

    declareOutputFields

    的签名declareOutputFields方法如下 -
    
    declareOutputFields(OutputFieldsDeclarer declarer)
    
    declarer− 用于声明输出流id、输出字段等。
    此方法用于指定元组的输出模式。

    ack

    ack方法如下 -
    
    ack(Object msgId)
    
    此方法确认已处理特定元组。

    fail

    nextTuple方法如下 -
    
    ack(Object msgId)
    
    此方法通知特定元组尚未完全处理。Storm 将重新处理特定的元组。

    FakeCallLogReaderSpout

    在我们的场景中,我们需要收集通话记录详细信息。通话记录的信息包含。
    • 来电号码
    • 收货人号码
    • 期间
    由于我们没有实时的通话记录信息,我们会生成虚假的通话记录。虚假信息将使用 Random 类创建。完整的程序代码如下。

    编码 - FakeCallLogReaderSpout.java

    
    import java.util.*;
    //import storm tuple packages
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    //import Spout interface packages
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    //Create a class FakeLogReaderSpout which implement IRichSpout interface 
       to access functionalities
        
    public class FakeCallLogReaderSpout implements IRichSpout {
       //Create instance for SpoutOutputCollector which passes tuples to bolt.
       private SpoutOutputCollector collector;
       private boolean completed = false;
        
       //Create instance for TopologyContext which contains topology data.
       private TopologyContext context;
        
       //Create instance for Random class.
       private Random randomGenerator = new Random();
       private Integer idx = 0;
       @Override
       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
          this.context = context;
          this.collector = collector;
       }
       @Override
       public void nextTuple() {
          if(this.idx <= 1000) {
             List<String> mobileNumbers = new ArrayList<String>();
             mobileNumbers.add("1234123401");
             mobileNumbers.add("1234123402");
             mobileNumbers.add("1234123403");
             mobileNumbers.add("1234123404");
             Integer localIdx = 0;
             while(localIdx++ < 100 && this.idx++ < 1000) {
                String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                    
                while(fromMobileNumber == toMobileNumber) {
                   toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                }
                    
                Integer duration = randomGenerator.nextInt(60);
                this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
             }
          }
       }
       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("from", "to", "duration"));
       }
       //Override all the interface methods
       @Override
       public void close() {}
       public boolean isDistributed() {
          return false;
       }
       @Override
       public void activate() {}
       @Override 
       public void deactivate() {}
       @Override
       public void ack(Object msgId) {}
       @Override
       public void fail(Object msgId) {}
       @Override
       public Map<String, Object> getComponentConfiguration() {
          return null;
       }
    }
    
  • bolt 创建

    Bolt 是一个将元组作为输入,处理元组,并产生新元组作为输出的组件。bolt 将实施IRichBolt界面。在这个程序中,两个bolt 类CallLogCreatorBoltCallLogCounterBolt用于执行操作。
    IRichBolt 接口有以下方法 -
    • prepare− 为bolt 提供执行环境。执行程序将运行此方法来初始化 spout。
    • execute− 处理单个输入元组。
    • cleanup− 当bolt 要关闭时调用。
    • declareOutputFields- 声明元组的输出模式。

    prepare

    prepare方法如下 -
    
    prepare(Map conf, TopologyContext context, OutputCollector collector)
    
    • conf− 为该bolt 提供 Storm 配置。
    • context− 提供有关 toplogy内bolt 位置、其任务 ID、输入和输出信息等的完整信息。
    • collector− 使我们能够发出已处理的元组。

    execute

    execute方法如下 -
    
    execute(Tuple tuple)
    
    这里tuple是要处理的输入元组。
    execute方法一次处理一个元组。元组数据可以通过 Tuple 类的 getValue 方法访问。没有必要立即处理输入元组。可以处理多个元组并将其作为单个输出元组输出。可以使用 OutputCollector 类发出处理后的元组。

    cleanup

    cleanup方法如下 -
    
    cleanup()
    

    declareOutputFields

    declareOutputFields方法如下 -
    
    declareOutputFields(OutputFieldsDeclarer declarer)
    
    这里的参数declarer用于声明输出流ID、输出字段等。
    此方法用于指定元组的输出模式
  • 通话记录创建者bolt

    通话记录创建者 Bolt 接收通话记录元组。呼叫日志元组具有呼叫者号码、接收者号码和呼叫持续时间。这个bolt 通过组合呼叫者号码和接收者号码来简单地创建一个新值。新值的格式为“Caller number - Receiver number”,命名为新字段“call”。完整的代码如下。

    编码 - CallLogCreatorBolt.java

    
    //import util packages
    import java.util.HashMap;
    import java.util.Map;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    //import Storm IRichBolt package
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;
    //Create a class CallLogCreatorBolt which implement IRichBolt interface
    public class CallLogCreatorBolt implements IRichBolt {
       //Create instance for OutputCollector which collects and emits tuples to produce output
       private OutputCollector collector;
       @Override
       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          this.collector = collector;
       }
       @Override
       public void execute(Tuple tuple) {
          String from = tuple.getString(0);
          String to = tuple.getString(1);
          Integer duration = tuple.getInteger(2);
          collector.emit(new Values(from + " - " + to, duration));
       }
       @Override
       public void cleanup() {}
       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("call", "duration"));
       }
        
       @Override
       public Map<String, Object> getComponentConfiguration() {
          return null;
       }
    }
    
  • 通话记录计数器bolt

    呼叫日志计数器 Bolt 以元组形式接收呼叫及其持续时间。这个bolt在prepare方法中初始化了一个字典(Map)对象。在execute方法,它检查元组并在字典对象中为元组中的每个新“调用”值创建一个新条目,并在字典对象中设置值 1。对于字典中已经可用的条目,它只是增加它的值。简单来说,这个bolt 将调用及其计数保存在字典对象中。除了将调用及其计数保存在字典中之外,我们还可以将其保存到数据源中。完整的程序代码如下 -

    编码 - CallLogCounterBolt.java

    
    import java.util.HashMap;
    import java.util.Map;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;
    public class CallLogCounterBolt implements IRichBolt {
       Map<String, Integer> counterMap;
       private OutputCollector collector;
       @Override
       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          this.counterMap = new HashMap<String, Integer>();
          this.collector = collector;
       }
       @Override
       public void execute(Tuple tuple) {
          String call = tuple.getString(0);
          Integer duration = tuple.getInteger(1);
            
          if(!counterMap.containsKey(call)){
             counterMap.put(call, 1);
          }else{
             Integer c = counterMap.get(call) + 1;
             counterMap.put(call, c);
          }
            
          collector.ack(tuple);
       }
       @Override
       public void cleanup() {
          for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
             System.out.println(entry.getKey()+" : " + entry.getValue());
          }
       }
       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("call"));
       }
        
       @Override
       public Map<String, Object> getComponentConfiguration() {
          return null;
       }
        
    }
    
  • 创建 toplogy

    Storm toplogy基本上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂的 toplogy。TopologyBuilder 类具有设置 spout 的方法(setSpout)并设置bolt (setBolt). 最后,TopologyBuilder 有 createTopology 来创建 toplogy。使用以下代码片段创建 toplogy -
    
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
    builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
       .shuffleGrouping("call-log-reader-spout");
    builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
       .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
    
    shuffleGroupingfieldsGrouping方法有助于为 spout 和 bolts 设置流分组。
  • 本地集群

    出于开发目的,我们可以使用“LocalCluster”对象创建一个本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交 toplogy。“submitTopology”的参数之一是“Config”类的一个实例。“Config”类用于在提交 toplogy之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用 prepare 方法发送到所有任务(spout 和 bolt)。将 toplogy提交到集群后,我们将等待 10 秒让集群计算提交的 toplogy,然后使用“LocalCluster”的“关闭”方法关闭集群。完整的程序代码如下 -

    编码 - LogAnalyserStorm.java

    
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    //import storm configuration packages
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    //Create main class LogAnalyserStorm submit topology.
    public class LogAnalyserStorm {
       public static void main(String[] args) throws Exception{
          //Create Config instance for cluster configuration
          Config config = new Config();
          config.setDebug(true);
            
          //
          TopologyBuilder builder = new TopologyBuilder();
          builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
          builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
             .shuffleGrouping("call-log-reader-spout");
          builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
             .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
                
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
          Thread.sleep(10000);
            
          //Stop the topology
            
          cluster.shutdown();
       }
    }
    
  • 构建和运行应用程序

    完整的应用程序有四个 Java 代码。他们是 -
    • FakeCallLogReaderSpout.java
    • CallLogCreaterBolt.java
    • CallLogCounterBolt.java
    • LogAnalyerStorm.java
    可以使用以下命令构建应用程序 -
    
    javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
    
    该应用程序可以使用以下命令运行 -
    
    java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
    

    输出

    一旦应用程序启动,它将输出有关集群启动过程、spout 和 bolt 处理,最后是集群关闭过程的完整详细信息。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上,如下所示 -
    
    1234123402 - 1234123401 : 78
    1234123402 - 1234123404 : 88
    1234123402 - 1234123403 : 105
    1234123401 - 1234123404 : 74
    1234123401 - 1234123403 : 81
    1234123401 - 1234123402 : 81
    1234123403 - 1234123404 : 86
    1234123404 - 1234123401 : 63
    1234123404 - 1234123402 : 82
    1234123403 - 1234123402 : 83
    1234123404 - 1234123403 : 86
    1234123403 - 1234123401 : 93
    
  • 非 JVM 语言

    Storm toplogy由 Thrift 接口实现,这使得以任何语言提交 toplogy变得容易。Storm 支持 Ruby、Python 和许多其他语言。让我们看一下python绑定。

    Python 绑定

    Python 是一种通用的解释型、交互式、面向对象的高级编程语言。Storm 支持 Python 实现其 toplogy。Python 支持发射、锚定、确认和记录操作。
    如您所知,bolt 可以用任何语言定义。用另一种语言编写的 Bolt 作为子进程执行,Storm 通过 stdin/stdout 使用 JSON 消息与这些子进程通信。首先取一个支持python绑定的示例bolt WordCount。
    
    public static class WordCount implements IRichBolt {
       public WordSplit() {
          super("python", "splitword.py");
       }
        
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word"));
       }
    }
    
    这里的类WordCount实现IRichBolt接口并与python实现一起运行指定的超级方法参数“splitword.py”。现在创建一个名为“splitword.py”的 python 实现。
    
    import storm
       class WordCountBolt(storm.BasicBolt):
          def process(self, tup):
             words = tup.values[0].split(" ")
             for word in words:
             storm.emit([word])
    WordCountBolt().run()
    
    这是 Python 的示例实现,用于计算给定句子中的单词。同样,您也可以绑定其他支持语言。