参考林子雨教案安装,成功后显示: 先来回忆一下storm的基本组成: 首先更新配置文件pom.xml WordCount.java 文件 最后运行的结果为: 运行注意事项: 2 be con…Storm的下载与安装
 
基于Storm的wordcount应用
实现原理
 
 wordcount的topo结构:
 
代码
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zchi</groupId> <artifactId>storm_study</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.7</version> </dependency> </dependencies> </project> import java.util.*; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.*; /*  ** WordCountTopolopgyAllInJava类(单词计数)  ** @author zhangchi  */ public class wordcount { // 定义一个喷头,用于产生数据。该类继承自BaseRichSpout public static class RandomSentenceSpout extends BaseRichSpout { private SpoutOutputCollector _collector; private Random _rand; private Map<String,Values> pending; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {             _collector = collector;             _rand = new Random();             pending=new HashMap<String, Values>(); } @Override public void nextTuple() { // 睡眠一段时间后再产生一个数据             Utils.sleep(100); // 句子数组             String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; // 随机选择一个句子             String sentence = sentences[_rand.nextInt(sentences.length)];             Values tmpValues=new Values(sentence);             String msgID= UUID.randomUUID().toString();             pending.put(msgID,tmpValues); // 发射该句子给Bolt,每个tuple都有一个唯一标识             _collector.emit(tmpValues,msgID); } // 确认函数:成功处理的tuple,其id会从pending列表中删除 @Override public void ack(Object id) {             System.out.println("Msg:"+id+" send successful!");             pending.remove(id); } // 失败处理函数:处理失败的时候重新发送一次tuple, @Override public void fail(Object id) {             System.out.println("Msg:"+id+" send failed,will try again!");             Values failedMsg=pending.get(id);             _collector.emit(failedMsg,id); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义一个字段word             declarer.declare(new Fields("word")); } } // 定义个Bolt,用于将句子切分为单词 public static class SplitSentence extends BaseRichBolt { private OutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义一个字段             declarer.declare(new Fields("word")); } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {             collector=outputCollector; } @Override public void execute(Tuple tuple) { // 接收到一个句子             String sentence = tuple.getString(0); // 把句子切割为单词             StringTokenizer iter = new StringTokenizer(sentence); // 发送每一个单词 while (iter.hasMoreElements()) {                 collector.emit(new Values(iter.nextToken())); } // 确认对数据进行处理             collector.ack(tuple); } } // 定义一个Bolt,用于单词计数 public static class WordCount extends BaseBasicBolt {         Map<String, Long> counts = new HashMap<String, Long>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 接收一个单词             String word = tuple.getString(0); // 获取该单词对应的计数             Long count = counts.get(word); if (count == null)                 count = 0l; // 计数增加             count++; // 将单词和对应的计数加入map中             counts.put(word, count);             collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义两个字段word和count             declarer.declare(new Fields("word", "count")); } } //定义全局Bolt,用于统计最终结果以及所有的单词数统计 public static class GlobalWordCount extends BaseBasicBolt{         Map<String,Long> result=new HashMap<String, Long>(); @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {             String word=tuple.getStringByField("word");             Long count=tuple.getLongByField("count");             result.put(word,count); } @Override public void cleanup(){             System.out.println("---------------------------------Final Result----------------------------------------------"); long totalCount=0; for (String key:result.keySet()){ long count=result.get(key);                 System.out.println("---------------------------------Word:"+key+"  Count:"+count);                 totalCount+=count; }             System.out.println("---------------------------------TotalCount:"+totalCount); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } public static void main(String[] args) throws Exception { // 创建一个拓扑         TopologyBuilder builder = new TopologyBuilder(); // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5         builder.setSpout("spout", new RandomSentenceSpout(), 2); // 设置slot——“split”,并行度为8,它的数据来源是spout的         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 设置slot——“count”,你并行度为12,它的数据来源是split的word字段         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); //设置slot--------"globalcount" ,数据来源是spout         builder.setBolt("globalcount",new GlobalWordCount()).globalGrouping("count");          Config conf = new Config();         conf.setDebug(false); //if(args != null && args.length > 0){ //if(false){ //  conf.setNumWorkers(3); //  StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); //}else{         conf.setMaxTaskParallelism(3); // 本地伪集群模式运行         LocalCluster cluster = new LocalCluster(); // 集群运行 //        StormSubmitter.submitTopology("word-count", conf, builder.createTopology() ); //本地伪集群 提交拓扑(该拓扑的名字叫word-count)         cluster.submitTopology("word-count", conf, builder.createTopology());         Thread.sleep(30000);         cluster.killTopology("word-count");         cluster.shutdown(); //} } } 
 
---------------------------------Final Result---------------------------------------------- ---------------------------------Word:away  Count:111 ---------------------------------Word:ago  Count:102 ---------------------------------Word:jumped  Count:114 ---------------------------------Word:seven  Count:199 ---------------------------------Word:cow  Count:114 ---------------------------------Word:two  Count:106 ---------------------------------Word:years  Count:102 ---------------------------------Word:dwarfs  Count:97 ---------------------------------Word:score  Count:102 ---------------------------------Word:apple  Count:111 ---------------------------------Word:white  Count:97 ---------------------------------Word:four  Count:102 ---------------------------------Word:and  Count:199 ---------------------------------Word:keeps  Count:111 ---------------------------------Word:day  Count:111 ---------------------------------Word:over  Count:114 ---------------------------------Word:a  Count:111 ---------------------------------Word:nature  Count:106 ---------------------------------Word:i  Count:106 ---------------------------------Word:am  Count:106 ---------------------------------Word:an  Count:111 ---------------------------------Word:the  Count:436 ---------------------------------Word:doctor  Count:111 ---------------------------------Word:with  Count:106 ---------------------------------Word:moon  Count:114 ---------------------------------Word:at  Count:106 ---------------------------------Word:snow  Count:97 ---------------------------------TotalCount:3402 
cd /usr/local/storm ./bin/storm nimbus /usr/local/storm/bin/storm supervisor 将storm写入HDFS
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算
 官方软件产品操作指南 (170)
官方软件产品操作指南 (170)