系统环境需要
- JDK配置好环境变量
- SSH,方便启动操作
- 安装Python,Python是Storm最底层依赖
Zookeeper安装
Zookeeper官网下载解压,安装参考Zookeeper安装配置,或者官方文档
Storm启动安装
本地模式
本地模式在一个进程中使用现场模拟Storm集群的所有功能,用于本地开调试。本地模式运行Topology于在集群上运行Topology类似,但是提交Topology任务是在本地机器上。
简单使用LocalCluster类,就能创建一个进程内集群。如:
| 1
 | LocalCluster cluster = new LocalCluster()
 | 
本地模式下要注意如下参数
- Config.TOPOLOGY_MAX_TASK_PARALLELISM 单组件最大线程数
- Config.TOPOLOGY_DEBUG
* 参考:本地模式http://storm.apache.org/releases/1.1.0/Local-mode.html
安装部署集群
下载Storm解压 storm官网
修改conf/storm.yaml配置文件,storm.yaml中配置会覆盖defaults.yaml中配置。
最基本配置如下
| 12
 3
 4
 5
 6
 
 | storm.zookeeper.servers:- "zk1"
 - "zk2"
 - "zk3"
 storm.local.dir: "/var/storm"
 nimbus.host: "192.168.0.100"
 
 | 
启动Storm集群
| 12
 3
 4
 5
 6
 
 | //启动nimbusbin/./storm nimbus </dev/null 2<&1 &
 //启动Supervisor
 bin/./storm supervisor </dev/null 2<&1 &
 //启动UI
 bin/./storm ui </dev/null 2<&1 &
 
 | 
Storm范例
统计Topology
WordCountTopology.java
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | public class WordCountTopology {public static void main(String[] args) throws InterruptedException {
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("word-spout", new WordSpout(), 1);
 builder.setBolt("word-split", new WordSplitSentenceBolt(), 2).shuffleGrouping("word-spout");
 builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word"));
 Config conf = new Config();
 conf.setDebug(true);
 conf.put("wordsFile", "D:\\tmp\\word.txt");
 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 LocalCluster cluster = new LocalCluster();
 
 
 
 
 try {
 cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
 } catch (Exception e) {
 e.printStackTrace();
 System.exit(0);
 }
 Thread.sleep(10000);
 cluster.shutdown();
 
 }
 }
 
 | 
spout输入
WordSpout.java
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 
 | public class WordSpout extends BaseRichSpout {private SpoutOutputCollector collector;
 private FileReader fileReader;
 private boolean completed = false;
 public void ack(Object msgId) {
 System.out.println("OK:"+msgId);
 }
 public void close() {}
 public void fail(Object msgId) {
 System.out.println("FAIL:"+msgId);
 }
 @Override
 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 try {
 this.fileReader = new FileReader(conf.get("wordsFile").toString());
 } catch (FileNotFoundException e) {
 e.printStackTrace();
 }
 this.collector = collector;
 }
 
 @Override
 public void nextTuple() {
 if(completed){
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 
 }
 return;
 }
 String str;
 BufferedReader reader = new BufferedReader(fileReader);
 try{
 while((str = reader.readLine()) != null){
 this.collector.emit(new Values(str));
 }
 }catch(Exception e){
 throw new RuntimeException("Error reading tuple",e);
 }finally{
 completed = true;
 }
 }
 
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("sentence"));
 }
 }
 
 | 
分割句子
WordSplitSentenceBolt.java
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | public class WordSplitSentenceBolt extends BaseBasicBolt {
 @Override
 public void execute(Tuple input, BasicOutputCollector collector) {
 String sentence = input.getString(0);
 for (String word : sentence.split(" ")) {
 word = word.trim();
 if (!word.isEmpty()) {
 collector.emit(new Values(word));
 }
 }
 }
 
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("word"));
 }
 }
 
 | 
统计单词数量
WordCountBolt.java
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | class WordCountBolt extends BaseBasicBolt{private final static Logger logger = LoggerFactory.getLogger(WordCountBolt.class);
 private Map<String, Integer>counts = new HashMap<>();
 @Override
 public void cleanup() {
 counts.forEach((key, value) -> {
 System.out.println("```````````````````````````````````````");
 logger.error("{}: {}", key, value);
 });
 }
 
 @Override
 public void execute(Tuple input, BasicOutputCollector collector) {
 String word = input.getString(0);
 Integer count = counts.get(word);
 if (count == null){
 count = 0;
 }
 count++;
 counts.put(word, count);
 }
 
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 }
 }
 
 | 
其他例子可参考:Storm官方例子
向集群提交任务
把项目打成jar包,在storm安装主目录下执行:
| 1
 | bin/./storm jar demo.jar com.whh.WordCountTopology
 | 
如果需要停止Topology,在storm目录下执行:
| 1
 | bin/./storm kill {toponmae}
 | 
其中{toponmae}是Topology提交到Storm集群时指定的Topology任务名称