Storm示例 Storm 安装 Storm Trident 我们已经了解了Apache Storm的核心技术细节,现在是编写一些简单场景的时候了。 场景 - 移动呼叫日志分析器 移动电话及其持续时间将作为Apache Storm的输入提供,Storm将处理并分组相同呼叫者和接收者之间的呼叫及其呼叫总数。 接口创建 Spout是用于数据生成的组件。基本上,喷嘴将实现一个IRichSpout接口。“IRichSpout”界面有以下重要方法 open - 为喷口提供执行环境。 执行者将运行此方法来初始化喷口。 nextTuple - 通过收集器发出生成的数据。 close - 喷嘴将要关闭时调用此方法。 declareOutputFields - 声明元组的输出模式。 ack - 确认处理了特定的元组 失败 - 指定一个特定的元组不被处理并且不被重新处理。 打开 open 方法的签名如下 open(Map conf, TopologyContext context, SpoutOutputCollector collector) conf - 为此喷口提供Storm配置。 上下文 - 提供关于拓扑中喷口位置,其任务ID,输入和输出信息的完整信息。 收集器 - 使我们能够发出将由Bolt处理的元组。 nextTuple nextTuple 方法的签名如下 - nextTuple() nextTuple()从与ack()和fail()方法相同的循环周期性地调用。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。所以nextTuple的第一行检查处理是否完成。如果是这样,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。 close close 方法的签名如下 close() declareOutputFields declareOutputFields 方法的签名如下所示 - declareOutputFields(OutputFieldsDeclarer declarer) 声明器 - 它用于声明输出流ID,输出字段等。 此方法用于指定元组的输出模式。 ACK ack 方法的签名如下 ack(Object msgId) 该方法确认已经处理了特定的元组。 失败 nextTuple 方法的签名如下 - ack(Object msgId) 此方法通知某个特定的元组尚未完全处理。Storm将重新处理特定的元组。 FakeCallLogReaderSpout 在我们的场景中,我们需要收集通话记录详细信息。通话记录的信息包含。 来电号码 接收器号码 持续时间 由于我们没有实时的通话记录信息,我们会生成虚假的通话记录。假信息将使用Random类创建。完整的程序代码如下。 编码 - FakeCallLogReaderSpout.java import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } } Bolt创建 Bolt是一个将元组作为输入,处理元组并生成新的元组作为输出的组件。Bolts将实施 IRichBolt 界面。在这个程序中,使用两个Bolt类 CallLogCreatorBolt 和 CallLogCounterBolt 来执行操作。 IRichBolt接口有以下方法 - 准备 - 为Bolt提供执行的环境。 执行者将运行此方法来初始化喷口。 执行 - 处理输入的单个元组。 清理 - 当Bolt即将关闭时调用。 declareOutputFields - 声明元组的输出模式。 准备 准备 方法的签名如下 - prepare(Map conf, TopologyContext context, OutputCollector collector) conf - 为此Bolt提供Storm配置。 上下文 - 提供有关拓扑中Bolt位置,其任务ID,输入和输出信息等的完整信息。 收集器 - 使我们能够发出处理过的元组。 执行 执行 方法的签名如下 execute(Tuple tuple) 这里的 元组 是要处理的输入元组。 所述 执行 方法一次处理单元组。元组数据可以通过Tuple类的getValue方法访问。没有必要立即处理输入元组。多元组可以作为单个输出元组进行处理和输出。处理过的元组可以通过使用OutputCollector类发出。 清理 清理 方法的签名如下 cleanup() declareOutputFields declareOutputFields 方法的签名如下所示 - declareOutputFields(OutputFieldsDeclarer declarer) 这里参数 声明 器用于声明输出流ID,输出字段等。 此方法用于指定元组的输出模式 通话记录创建者Bolt 通话记录创建器Bolt接收通话记录元组。通话记录元组具有主叫号码,接收者号码和通话时长。通过组合主叫方号码和接收方号码,此Bolt简单地创建一个新值。新值的格式为“来电号码 接收方号码”,并将其命名为新字段“call”。完整的代码如下。 编码 - CallLogCreatorBolt.java //import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } 通话记录计数器Bolt 呼叫记录计数器Bolt接收呼叫及其持续时间作为元组。这个Bolt在prepare方法中初始化一个字典(Map)对象。在 execute 方法中,它检查元组并在元组中为每个新的“调用”值在字典对象中创建一个新条目,并在字典对象中设置值1。对于字典中已有的条目,它只是递增其值。简单地说,这个Bolt将调用和它的计数保存在字典对象中。我们可以将它保存到数据源中,而不是将调用和它的计数保存在字典中。完整的程序代码如下所示 编码 - CallLogCounterBolt.java import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } 创建拓扑 Storm拓扑基本上是一个Thrift结构。TopologyBuilder类提供了简单而简单的方法来创建复杂的拓扑。TopologyBuilder类具有设置喷口 (setSpout) 和设置Bolt (setBolt)的方法 。最后,TopologyBuilder创建拓扑来创建拓扑。使用下面的代码片段来创建一个拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); shuffleGrouping 和 fieldsGrouping 方法有助于设置喷嘴和Bolt的流分组。 本地群集 出于开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。“submitTopology”的一个参数是“Config”类的一个实例。在提交拓扑之前,“Config”类用于设置配置选项。该配置选项将在运行时与集群配置合并,并通过prepare方法发送到所有任务(spout和bolt)。将拓扑提交到群集后,我们将等待10秒钟,以便群集计算提交的拓扑,然后使用“LocalCluster”的“关闭”方法关闭群集。完整的程序代码如下所示 编码 - LogAnalyserStorm.java import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } } 构建和运行应用程序 完整的应用程序有四个Java代码。他们是 - FakeCallLogReaderSpout.java CallLogCreaterBolt.java CallLogCounterBolt.java LogAnalyerStorm.java 应用程序可以使用以下命令构建 javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java 应用程序可以使用以下命令运行 - java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm 输出 一旦应用程序启动,它将输出有关集群启动过程,喷出和Bolt处理的完整详细信息,最后还会输出集群关闭过程。在“CallLogCounterBolt”中,我们打印了通话及其计数详细信息。这些信息将如下显示在控制台上 1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93 非JVM语言 Storm风格的拓扑结构通过Thrift接口实现,这使得用任何语言提交拓扑变得非常容易。Storm支持Ruby,Python和许多其他语言。我们来看看python绑定。 Python绑定 Python是一种通用的解释型,交互式,面向对象和高级编程语言。Storm支持Python来实现其拓扑。Python支持发射,锚定,确认和记录操作。 如你所知,Bolt可以用任何语言来定义。以另一种语言编写的Bolt作为子流程执行,Storm通过标准输入/标准输出与JSON消息通信。首先拿一个支持python绑定的示例BoltWordCount。 public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } 这里的 WordCount 类实现了 IRichBolt 接口,并使用python实现指定的超级方法参数“splitword.py”运行。现在创建一个名为“splitword.py”的python实现。 import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run() 这是Python的示例实现,用于计算给定句子中的单词。同样,您也可以使用其他支持语言进行绑定。 Storm 安装 Storm Trident