Storm 起步

2018-08-27 09:53 更新

準(zhǔn)備開始

準(zhǔn)備開始

在本章,我們要?jiǎng)?chuàng)建一個(gè) Storm 工程和我們的第一個(gè) Storm 拓?fù)浣Y(jié)構(gòu)。

NOTE: 下面假設(shè)你的 JRE 版本在 1.6 以上。我們推薦 Oracle 提供的 JRE。你可以到 http://www.java.com/downloads/ 下載。

操作模式

開始之前,有必要了解一下 Storm 的操作模式。有下面兩種方式。

本地模式

在本地模式下,Storm 拓?fù)浣Y(jié)構(gòu)運(yùn)行在本地計(jì)算機(jī)的單一 JVM 進(jìn)程上。這個(gè)模式用于開發(fā)、測試以及調(diào)試,因?yàn)檫@是觀察所有組件如何協(xié)同工作的最簡單方法。在這種模式下,我們可以調(diào)整參數(shù),觀察我們的拓?fù)浣Y(jié)構(gòu)如何在不同的 Storm 配置環(huán)境下運(yùn)行。要在本地模式下運(yùn)行,我們要下載 Storm 開發(fā)依賴,以便用來開發(fā)并測試我們的拓?fù)浣Y(jié)構(gòu)。我們創(chuàng)建了第一個(gè) Storm 工程以后,很快就會(huì)明白如何使用本地模式了。

NOTE: 在本地模式下,跟在集群環(huán)境運(yùn)行很像。不過很有必要確認(rèn)一下所有組件都是線程安全的,因?yàn)楫?dāng)把它們部署到遠(yuǎn)程模式時(shí)它們可能會(huì)運(yùn)行在不同的 JVM 進(jìn)程甚至不同的物理機(jī)上,這個(gè)時(shí)候它們之間沒有直接的通訊或共享內(nèi)存。

我們要在本地模式運(yùn)行本章的所有例子。

遠(yuǎn)程模式

在遠(yuǎn)程模式下,我們向 Storm 集群提交拓?fù)?,它通常由許多運(yùn)行在不同機(jī)器上的流程組成。遠(yuǎn)程模式不會(huì)出現(xiàn)調(diào)試信息, 因此它也稱作生產(chǎn)模式。不過在單一開發(fā)機(jī)上建立一個(gè) Storm 集群是一個(gè)好主意,可以在部署到生產(chǎn)環(huán)境之前,用來確認(rèn)拓?fù)湓诩涵h(huán)境下沒有任何問題。

你將在第六章學(xué)到更多關(guān)于遠(yuǎn)程模式的內(nèi)容,并在附錄B學(xué)到如何安裝一個(gè) Storm 集群。

Hello World

我們在這個(gè)工程里創(chuàng)建一個(gè)簡單的拓?fù)?,?shù)單詞數(shù)量。我們可以把這個(gè)看作 Storm 的 “Hello World”。不過,這是一個(gè)非常強(qiáng)大的拓?fù)?,因?yàn)樗軌驍U(kuò)展到幾乎無限大的規(guī)模,而且只需要做一些小修改,就能用它構(gòu)建一個(gè)統(tǒng)計(jì)系統(tǒng)。舉個(gè)例子,我們可以修改一下工程用來找出 Twitter 上的熱點(diǎn)話題。

要?jiǎng)?chuàng)建這個(gè)拓?fù)?,我們要用一個(gè) spout 讀取文本,第一個(gè) bolt 用來標(biāo)準(zhǔn)化單詞,第二個(gè) bolt 為單詞計(jì)數(shù),如圖2-1所示。

你可以從這個(gè)網(wǎng)址下載源碼壓縮包, https://github.com/storm-book/examples-ch02-getting_started/zipball/master。

NOTE: 如果你使用 git(一個(gè)分布式版本控制與源碼管理工具),你可以執(zhí)行 git clone git@github.com:storm-book/examples-ch02-getting_started.git,把源碼檢出到你指定的目錄。

Java 安裝檢查

構(gòu)建 Storm 運(yùn)行環(huán)境的第一步是檢查你安裝的 Java 版本。打開一個(gè)控制臺(tái)窗口并執(zhí)行命令:java -version??刂婆_(tái)應(yīng)該會(huì)顯示出類似如下的內(nèi)容:

    java -version

    java version "1.6.0_26"
    Java(TM) SE Runtime Enviroment (build 1.6.0_26-b03)

    Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)  

如果不是上述內(nèi)容,檢查你的 Java 安裝情況。(參考 http://www.java.com/download/

創(chuàng)建工程

開始之前,先為這個(gè)應(yīng)用建一個(gè)目錄(就像你平常為 Java 應(yīng)用做的那樣)。這個(gè)目錄用來存放工程源碼。

接下來我們要下載 Storm 依賴包,這是一些 jar 包,我們要把它們添加到應(yīng)用類路徑中。你可以采用如下兩種方式之一完成這一步:

  • 下載所有依賴,解壓縮它們,把它 們添加到類路徑
  • 使用 Apache Maven

NOTE: Maven 是一個(gè)軟件項(xiàng)目管理的綜合工具。它可以用來管理項(xiàng)目的開發(fā)周期的許多方面,從包依賴到版本發(fā)布過程。在這本書中,我們將廣泛使用它。如果要檢查是否已經(jīng)安裝了maven,在命令行運(yùn)行 mvn。如果沒有安裝你可以從 http://maven.apache.org/download.html下載。

沒有必要先成為一個(gè) Maven 專家才能使用 Storm,不過了解一下關(guān)于 Maven 工作方式的基礎(chǔ)知識(shí)仍然會(huì)對你有所幫助。你可以在 Apache Maven 的網(wǎng)站上找到更多的信息(http://maven.apache.org/)。

NOTE: Storm 的 Maven 依賴引用了運(yùn)行 Storm 本地模式的所有庫。

要運(yùn)行我們的拓?fù)?,我們可以編寫一個(gè)包含基本組件的 pom.xml 文件。

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
             <modelVersion>4.0.0</modelVersion>
             <groupId>storm.book</groupId>
             <artifactId>Getting-Started</artifactId>
             <version>0.0.1-SNAPSHOT</version>
             <build>
                 <plugins>
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-compiler-plugin</artifactId>
                         <version>2.3.2</version>
                         <configuration>
                             <source>1.6</source>
                             <target>1.6</target>
                             <compilerVersion>1.6</compilerVersion>
                         </configuration>
                     </plugin>
                 </plugins>
             </build>
             <repositories>
                 
                 <repository>
                     <id>clojars.org</id>
                     <url>http://clojars.org/repo</url>
                 </repository>
             </repositories>
             <dependencies>
                 
                 <dependency>
                     <groupId>storm</groupId>
                     <artifactId>storm</artifactId>
                     <version>0.6.0</version>
                 </dependency>
             </dependencies>
    </project>  

開頭幾行指定了工程名稱和版本號(hào)。然后我們添加了一個(gè)編譯器插件,告知 Maven 我們的代碼要用 Java1.6 編譯。接下來我們定義了 Maven 倉庫(Maven 支持為同一個(gè)工程指定多個(gè)倉庫)。clojars 是存放 Storm 依賴的倉庫。Maven 會(huì)為運(yùn)行本地模式自動(dòng)下載必要的所有子包依賴。

一個(gè)典型的 Maven Java 工程會(huì)擁有如下結(jié)構(gòu):

我們的應(yīng)用目錄/
         ├── pom.xml
         └── src
               └── main
                  └── java
               |  ├── spouts
               |  └── bolts
               └── resources  

java 目錄下的子目錄包含我們的代碼,我們把要統(tǒng)計(jì)單詞數(shù)的文件保存在 resource 目錄下。

NOTE:命令 mkdir -p 會(huì)創(chuàng)建所有需要的父目錄。

創(chuàng)建我們的第一個(gè) Topology

我們將為運(yùn)行單詞計(jì)數(shù)創(chuàng)建所有必要的類??赡苓@個(gè)例子中的某些部分,現(xiàn)在無法講的很清楚,不過我們會(huì)在隨后的章節(jié)做進(jìn)一步的講解。

Spout

pout WordReader 類實(shí)現(xiàn)了 IRichSpout 接口。我們將在第四章看到更多細(xì)節(jié)。WordReader負(fù)責(zé)從文件按行讀取文本,并把文本行提供給第一個(gè) bolt。

NOTE: 一個(gè) spout 發(fā)布一個(gè)定義域列表。這個(gè)架構(gòu)允許你使用不同的 bolts 從同一個(gè)spout 流讀取數(shù)據(jù),它們的輸出也可作為其它 bolts 的定義域,以此類推。

例2-1包含 WordRead 類的完整代碼(我們將會(huì)分析下述代碼的每一部分)。


       /
           例2-1.src/main/java/spouts/WordReader.java
         /
        package spouts;

        import java.io.BufferedReader;
        import java.io.FileNotFoundException;
        import java.io.FileReader;
        import java.util.Map;
        import backtype.storm.spout.SpoutOutputCollector;
        import backtype.storm.task.TopologyContext;
        import backtype.storm.topology.IRichSpout;
        import backtype.storm.topology.OutputFieldsDeclarer;
        import backtype.storm.tuple.Fields;
        import backtype.storm.tuple.Values;

        public class WordReader implements IRichSpout {
            private SpoutOutputCollector collector;
            private FileReader fileReader;
            private boolean completed = false;
            private TopologyContext context;
            public boolean isDistributed() {return false;}
            public void ack(Object msgId) {
                    System.out.println("OK:"+msgId);
            }
            public void close() {}
            public void fail(Object msgId) {
                 System.out.println("FAIL:"+msgId);
            }
            /
              這個(gè)方法做的惟一一件事情就是分發(fā)文件中的文本行
             /
            public void nextTuple() {
            /
              這個(gè)方法會(huì)不斷的被調(diào)用,直到整個(gè)文件都讀完了,我們將等待并返回。
             /
                 if(completed){
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         //什么也不做
                     }
                    return;
                 }
                 String str;
                 //創(chuàng)建reader
                 BufferedReader reader = new BufferedReader(fileReader);
                 try{
                     //讀所有文本行
                    while((str = reader.readLine()) != null){
                     /
                       按行發(fā)布一個(gè)新值
                      /
                         this.collector.emit(new Values(str),str);
                     }
                 }catch(Exception e){
                     throw new RuntimeException("Error reading tuple",e);
                 }finally{
                     completed = true;
                 }
             }
             /
               我們將創(chuàng)建一個(gè)文件并維持一個(gè)collector對象
              /
             public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                     try {
                         this.context = context;
                         this.fileReader = new FileReader(conf.get("wordsFile").toString());
                     } catch (FileNotFoundException e) {
                         throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
                     }
                     this.collector = collector;
             }
             /
               聲明輸入域"word"
              /
             public void declareOutputFields(OutputFieldsDeclarer declarer) {
                 declarer.declare(new Fields("line"));
             }
        }  

第一個(gè)被調(diào)用的 spout 方法都是 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下參數(shù):配置對象,在定義topology 對象是創(chuàng)建;TopologyContext 對象,包含所有拓?fù)鋽?shù)據(jù);還有SpoutOutputCollector 對象,它能讓我們發(fā)布交給 bolts 處理的數(shù)據(jù)。下面的代碼主是這個(gè)方法的實(shí)現(xiàn)。

    public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }  

我們在這個(gè)方法里創(chuàng)建了一個(gè) FileReader 對象,用來讀取文件。接下來我們要實(shí)現(xiàn) public void nextTuple(),我們要通過它向 bolts 發(fā)布待處理的數(shù)據(jù)。在這個(gè)例子里,這個(gè)方法要讀取文件并逐行發(fā)布數(shù)據(jù)。

    public void nextTuple() {
        if(completed){
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                //什么也不做
            }
            return;
        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            while((str = reader.readLine()) != null){
                this.collector.emit(new Values(str));
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }  

NOTE: Values 是一個(gè) ArrarList 實(shí)現(xiàn),它的元素就是傳入構(gòu)造器的參數(shù)。

nextTuple() 會(huì)在同一個(gè)循環(huán)內(nèi)被 ack()fail() 周期性的調(diào)用。沒有任務(wù)時(shí)它必須釋放對線程的控制,其它方法才有機(jī)會(huì)得以執(zhí)行。因此 nextTuple 的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負(fù)載,會(huì)在返回前休眠一毫秒。如果任務(wù)完成了,文件中的每一行都已被讀出并分發(fā)了。

NOTE:元組(tuple)是一個(gè)具名值列表,它可以是任意 java 對象(只要它是可序列化的)。默認(rèn)情況,Storm 會(huì)序列化字符串、字節(jié)數(shù)組、ArrayList、HashMap 和 HashSet 等類型。

Bolts

現(xiàn)在我們有了一個(gè) spout,用來按行讀取文件并每行發(fā)布一個(gè)元組,還要?jiǎng)?chuàng)建兩個(gè) bolts,用來處理它們(看圖2-1)。bolts 實(shí)現(xiàn)了接口 backtype.storm.topology.IRichBolt。

bolt最重要的方法是void execute(Tuple input),每次接收到元組時(shí)都會(huì)被調(diào)用一次,還會(huì)再發(fā)布若干個(gè)元組。

NOTE: 只要必要,bolt 或 spout 會(huì)發(fā)布若干元組。當(dāng)調(diào)用 nextTupleexecute 方法時(shí),它們可能會(huì)發(fā)布0個(gè)、1個(gè)或許多個(gè)元組。你將在第五章學(xué)習(xí)更多這方面的內(nèi)容。

第一個(gè) bolt,WordNormalizer,負(fù)責(zé)得到并標(biāo)準(zhǔn)化每行文本。它把文本行切分成單詞,大寫轉(zhuǎn)化成小寫,去掉頭尾空白符。

首先我們要聲明 bolt 的出參:

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word"));
    }  

這里我們聲明 bolt 將發(fā)布一個(gè)名為 “word” 的域。

下一步我們實(shí)現(xiàn) public void execute(Tuple input),處理傳入的元組:

    public void execute(Tuple input){
        String sentence=input.getString(0);
        String[] words=sentence.split(" ");
        for(String word : words){
            word=word.trim();
            if(!word.isEmpty()){
                word=word.toLowerCase();
                //發(fā)布這個(gè)單詞
                collector.emit(new Values(word));
            }
        }
        //對元組做出應(yīng)答
        collector.ack(input);
    }  

第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理并用collector對象發(fā)布。最后,每次都調(diào)用collector 對象的 ack() 方法確認(rèn)已成功處理了一個(gè)元組。

例2-2是這個(gè)類的完整代碼。

    //例2-2 src/main/java/bolts/WordNormalizer.java
    package bolts;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    public class WordNormalizer implements IRichBolt{
        private OutputCollector collector;
        public void cleanup(){}
        /
           bolt從單詞文件接收到文本行,并標(biāo)準(zhǔn)化它。
           文本行會(huì)全部轉(zhuǎn)化成小寫,并切分它,從中得到所有單詞。
         /
        public void execute(Tuple input){
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for(String word : words){
                word = word.trim();
                if(!word.isEmpty()){
                    word=word.toLowerCase();
                    //發(fā)布這個(gè)單詞
                    List a = new ArrayList();
                    a.add(input);
                    collector.emit(a,new Values(word));
                }
            }
            //對元組做出應(yīng)答
            collector.ack(input);
        }
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
        }

        /
           這個(gè)bolt只會(huì)發(fā)布“word”域
          /
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }  

NOTE:通過這個(gè)例子,我們了解了在一次 execute 調(diào)用中發(fā)布多個(gè)元組。如果這個(gè)方法在一次調(diào)用中接收到句子 “This is the Storm book”,它將會(huì)發(fā)布五個(gè)元組。

下一個(gè)bolt,WordCounter,負(fù)責(zé)為單詞計(jì)數(shù)。這個(gè)拓?fù)浣Y(jié)束時(shí)(cleanup() 方法被調(diào)用時(shí)),我們將顯示每個(gè)單詞的數(shù)量。

NOTE: 這個(gè)例子的 bolt 什么也沒發(fā)布,它把數(shù)據(jù)保存在 map 里,但是在真實(shí)的場景中可以把數(shù)據(jù)保存到數(shù)據(jù)庫。

package bolts;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map counters;
    private OutputCollector collector;

    /
       這個(gè)spout結(jié)束時(shí)(集群關(guān)閉的時(shí)候),我們會(huì)顯示單詞數(shù)量
      /
    @Override
    public void cleanup(){
        System.out.println("-- 單詞數(shù) 【"+name+"-"+id+"】 --");
        for(Map.Entry entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /
       為每個(gè)單詞計(jì)數(shù)
     /
@Override
public void execute(Tuple input) {
    String str=input.getString(0);
    /**
      如果單詞尚不存在于map,我們就創(chuàng)建一個(gè),如果已在,我們就為它加1
     /
    if(!counters.containsKey(str)){
        counters.put(str,1);
    }else{
        Integer c = counters.get(str) + 1;
        counters.put(str,c);
    }
    //對元組作為應(yīng)答
    collector.ack(input);
}
/ 初始化 / @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.counters = new HashMap(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}}

execute 方法使用一個(gè) map 收集單詞并計(jì)數(shù)。拓?fù)浣Y(jié)束時(shí),將調(diào)用 clearup() 方法打印計(jì)數(shù)器 map。(雖然這只是一個(gè)例子,但是通常情況下,當(dāng)拓?fù)潢P(guān)閉時(shí),你應(yīng)當(dāng)使用 cleanup() 方法關(guān)閉活動(dòng)的連接和其它資源。)

主類

你可以在主類中創(chuàng)建拓?fù)浜鸵粋€(gè)本地集群對象,以便于在本地測試和調(diào)試。LocalCluster 可以通過 Config 對象,讓你嘗試不同的集群配置。比如,當(dāng)使用不同數(shù)量的工作進(jìn)程測試你的拓?fù)鋾r(shí),如果不小心使用了某個(gè)全局變量或類變量,你就能夠發(fā)現(xiàn)錯(cuò)誤。(更多內(nèi)容請見第三章)

NOTE:所有拓?fù)涔?jié)點(diǎn)的各個(gè)進(jìn)程必須能夠獨(dú)立運(yùn)行,而不依賴共享數(shù)據(jù)(也就是沒有全局變量或類變量),因?yàn)楫?dāng)拓?fù)溥\(yùn)行在真實(shí)的集群環(huán)境時(shí),這些進(jìn)程可能會(huì)運(yùn)行在不同的機(jī)器上。

接下來,TopologyBuilder 將用來創(chuàng)建拓?fù)洌鼪Q定 Storm 如何安排各節(jié)點(diǎn),以及它們交換數(shù)據(jù)的方式。

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word-reader", new WordReader());
    builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
    builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");  

spoutbolts 之間通過 shuffleGrouping 方法連接。這種分組方式?jīng)Q定了 Storm 會(huì)以隨機(jī)分配方式從源節(jié)點(diǎn)向目標(biāo)節(jié)點(diǎn)發(fā)送消息。

下一步,創(chuàng)建一個(gè)包含拓?fù)渑渲玫?Config 對象,它會(huì)在運(yùn)行時(shí)與集群配置合并,并通過prepare 方法發(fā)送給所有節(jié)點(diǎn)。

    Config conf = new Config();
    conf.put("wordsFile", args[0]);
    conf.setDebug(true);  

由 spout 讀取的文件的文件名,賦值給 wordFile 屬性。由于是在開發(fā)階段,設(shè)置 debug 屬性為 true,Strom 會(huì)打印節(jié)點(diǎn)間交換的所有消息,以及其它有助于理解拓?fù)溥\(yùn)行方式的調(diào)試數(shù)據(jù)。

正如之前講過的,你要用一個(gè) LocalCluster 對象運(yùn)行這個(gè)拓?fù)洹T谏a(chǎn)環(huán)境中,拓?fù)鋾?huì)持續(xù)運(yùn)行,不過對于這個(gè)例子而言,你只要運(yùn)行它幾秒鐘就能看到結(jié)果。

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
    Thread.sleep(2000);
    cluster.shutdown();  

調(diào)用 createTopologysubmitTopology,運(yùn)行拓?fù)?,休眠兩秒鐘(拓?fù)湓诹硗獾木€程運(yùn)行),然后關(guān)閉集群。

例2-3是完整的代碼

    //例2-3 src/main/java/TopologyMain.java
    import spouts.WordReader;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import bolts.WordCounter;
    import bolts.WordNormalizer;

    public class TopologyMain {
        public static void main(String[] args) throws InterruptedException {
        //定義拓?fù)?            TopologyBuilder builder = new TopologyBuilder());
            builder.setSpout("word-reader", new WordReader());
            builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
            builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));

        //配置
            Config conf = new Config();
            conf.put("wordsFile", args[0]);
            conf.setDebug(false);

        //運(yùn)行拓?fù)?             conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology();
            Thread.sleep(1000);
            cluster.shutdown();
        }
    }  

觀察運(yùn)行情況

你已經(jīng)為運(yùn)行你的第一個(gè)拓?fù)錅?zhǔn)備好了。在這個(gè)目錄下面創(chuàng)建一個(gè)文件,/src/main/resources/words.txt,一個(gè)單詞一行,然后用下面的命令運(yùn)行這個(gè)拓?fù)洌?strong>mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt。舉個(gè)例子,如果你的 words.txt 文件有如下內(nèi)容: Storm test are great is an Storm simple application but very powerful really Storm is great 你應(yīng)該會(huì)在日志中看到類似下面的內(nèi)容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個(gè)例子中,每類節(jié)點(diǎn)只有一個(gè)實(shí)例。但是如果你有一個(gè)非常大的日志文件呢?你能夠很輕松的改變系統(tǒng)中的節(jié)點(diǎn)數(shù)量實(shí)現(xiàn)并行工作。這個(gè)時(shí)候,你就要?jiǎng)?chuàng)建兩個(gè) WordCounter* 實(shí)例。

    builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");  

程序返回時(shí),你將看到: — 單詞數(shù) 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 單詞數(shù) [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒極了!修改并行度實(shí)在是太容易了(當(dāng)然對于實(shí)際情況來說,每個(gè)實(shí)例都會(huì)運(yùn)行在單獨(dú)的機(jī)器上)。不過似乎有一個(gè)問題:單詞 is 和 great 分別在每個(gè) WordCounter 各計(jì)數(shù)一次。怎么會(huì)這樣?當(dāng)你調(diào)用shuffleGrouping 時(shí),就決定了 Storm 會(huì)以隨機(jī)分配的方式向你的 bolt 實(shí)例發(fā)送消息。在這個(gè)例子中,理想的做法是相同的單詞問題發(fā)送給同一個(gè) WordCounter 實(shí)例。你把shuffleGrouping(“word-normalizer”) 換成 fieldsGrouping(“word-normalizer”, new Fields(“word”)) 就能達(dá)到目的。試一試,重新運(yùn)行程序,確認(rèn)結(jié)果。 你將在后續(xù)章節(jié)學(xué)習(xí)更多分組方式和消息流類型。

結(jié)論

我們已經(jīng)討論了 Storm 的本地和遠(yuǎn)程操作模式之間的不同,以及 Storm 的強(qiáng)大和易于開發(fā)的特性。你也學(xué)習(xí)了一些 Storm 的基本概念,我們將在后續(xù)章節(jié)深入講解它們。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)