系统环境需要
- JDK配置好环境变量
- SSH,方便启动操作
- 安装Python,Python是Storm最底层依赖
Zookeeper安装
Zookeeper官网下载解压,安装参考Zookeeper安装配置,或者官方文档
Storm启动安装
本地模式
本地模式在一个进程中使用现场模拟Storm集群的所有功能,用于本地开调试。本地模式运行Topology于在集群上运行Topology类似,但是提交Topology任务是在本地机器上。
简单使用LocalCluster类,就能创建一个进程内集群。如:
1
| LocalCluster cluster = new LocalCluster()
|
本地模式下要注意如下参数
- Config.TOPOLOGY_MAX_TASK_PARALLELISM 单组件最大线程数
- Config.TOPOLOGY_DEBUG
* 参考:本地模式http://storm.apache.org/releases/1.1.0/Local-mode.html
安装部署集群
下载Storm解压 storm官网
修改conf/storm.yaml配置文件,storm.yaml中配置会覆盖defaults.yaml中配置。
最基本配置如下
1 2 3 4 5 6
| storm.zookeeper.servers: - "zk1" - "zk2" - "zk3" storm.local.dir: "/var/storm" nimbus.host: "192.168.0.100"
|
启动Storm集群
1 2 3 4 5 6
| //启动nimbus bin/./storm nimbus </dev/null 2<&1 & //启动Supervisor bin/./storm supervisor </dev/null 2<&1 & //启动UI bin/./storm ui </dev/null 2<&1 &
|
Storm范例
统计Topology
WordCountTopology.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class WordCountTopology { public static void main(String[] args) throws InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-spout", new WordSpout(), 1); builder.setBolt("word-split", new WordSplitSentenceBolt(), 2).shuffleGrouping("word-spout"); builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); conf.put("wordsFile", "D:\\tmp\\word.txt"); conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster();
try { cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); System.exit(0); } Thread.sleep(10000); cluster.shutdown();
} }
|
spout输入
WordSpout.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class WordSpout extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed = false; public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close() {} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { e.printStackTrace(); } this.collector = collector; }
@Override public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { } return; } String str; BufferedReader reader = new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ this.collector.emit(new Values(str)); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
|
分割句子
WordSplitSentenceBolt.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class WordSplitSentenceBolt extends BaseBasicBolt {
@Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); for (String word : sentence.split(" ")) { word = word.trim(); if (!word.isEmpty()) { collector.emit(new Values(word)); } } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
|
统计单词数量
WordCountBolt.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class WordCountBolt extends BaseBasicBolt{ private final static Logger logger = LoggerFactory.getLogger(WordCountBolt.class); private Map<String, Integer>counts = new HashMap<>(); @Override public void cleanup() { counts.forEach((key, value) -> { System.out.println("```````````````````````````````````````"); logger.error("{}: {}", key, value); }); }
@Override public void execute(Tuple input, BasicOutputCollector collector) { String word = input.getString(0); Integer count = counts.get(word); if (count == null){ count = 0; } count++; counts.put(word, count); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
|
其他例子可参考:Storm官方例子
向集群提交任务
把项目打成jar包,在storm安装主目录下执行:
1
| bin/./storm jar demo.jar com.whh.WordCountTopology
|
如果需要停止Topology,在storm目录下执行:
1
| bin/./storm kill {toponmae}
|
其中{toponmae}是Topology提交到Storm集群时指定的Topology任务名称