Storm 拓?fù)?/h1>

2018-09-28 16:06 更新

拓?fù)?/h2>

在這一章,你將學(xué)到如何在同一個(gè) Storm 拓?fù)浣Y(jié)構(gòu)內(nèi)的不同組件之間傳遞元組,以及如何向一個(gè)運(yùn)行中的 Storm 集群發(fā)布一個(gè)拓?fù)洹?/p>

數(shù)據(jù)流組

設(shè)計(jì)一個(gè)拓?fù)鋾r(shí),你要做的最重要的事情之一就是定義如何在各組件之間交換數(shù)據(jù)(數(shù)據(jù)流是如何被 bolts 消費(fèi)的)。一個(gè)據(jù)數(shù)流組指定了每個(gè) bolt 會(huì)消費(fèi)哪些數(shù)據(jù)流,以及如何消費(fèi)它們。

NOTE:一個(gè)節(jié)點(diǎn)能夠發(fā)布一個(gè)以上的數(shù)據(jù)流,一個(gè)數(shù)據(jù)流組允許我們選擇接收哪個(gè)。

數(shù)據(jù)流組在定義拓?fù)鋾r(shí)設(shè)置,就像我們?cè)诘诙驴吹降模?/p>

···
    builder.setBolt("word-normalizer", new WordNormalizer())
           .shuffleGrouping("word-reader");
···  

在前面的代碼塊里,一個(gè) bolt 由 TopologyBuilder 對(duì)象設(shè)定, 然后使用隨機(jī)數(shù)據(jù)流組指定數(shù)據(jù)源。數(shù)據(jù)流組通常將數(shù)據(jù)源組件的 ID 作為參數(shù),取決于數(shù)據(jù)流組的類(lèi)型不同還有其它可選參數(shù)。

NOTE:每個(gè) InputDeclarer 可以有一個(gè)以上的數(shù)據(jù)源,而且每個(gè)數(shù)據(jù)源可以分到不同的組。

隨機(jī)數(shù)據(jù)流組

隨機(jī)流組是最常用的數(shù)據(jù)流組。它只有一個(gè)參數(shù)(數(shù)據(jù)源組件),并且數(shù)據(jù)源會(huì)向隨機(jī)選擇的 bolt 發(fā)送元組,保證每個(gè)消費(fèi)者收到近似數(shù)量的元組。

隨機(jī)數(shù)據(jù)流組用于數(shù)學(xué)計(jì)算這樣的原子操作。然而,如果操作不能被隨機(jī)分配,就像第二章為單詞計(jì)數(shù)的例子,你就要考慮其它分組方式了。

域數(shù)據(jù)流組

域數(shù)據(jù)流組允許你基于元組的一個(gè)或多個(gè)域控制如何把元組發(fā)送給 bolts。 它保證擁有相同域組合的值集發(fā)送給同一個(gè) bolt。 回到單詞計(jì)數(shù)器的例子,如果你用 word 域?yàn)閿?shù)據(jù)流分組,word-normalizer bolt 將只會(huì)把相同單詞的元組發(fā)送給同一個(gè) word-counterbolt 實(shí)例。

···
    builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));
···  

NOTE: 在域數(shù)據(jù)流組中的所有域集合必須存在于數(shù)據(jù)源的域聲明中。

全部數(shù)據(jù)流組

全部數(shù)據(jù)流組,為每個(gè)接收數(shù)據(jù)的實(shí)例復(fù)制一份元組副本。這種分組方式用于向 bolts 發(fā)送信號(hào)。比如,你要刷新緩存,你可以向所有的 bolts 發(fā)送一個(gè)刷新緩存信號(hào)。在單詞計(jì)數(shù)器的例子里,你可以使用一個(gè)全部數(shù)據(jù)流組,添加清除計(jì)數(shù)器緩存的功能(見(jiàn)拓?fù)涫纠?/a>)

    public void execute(Tuple input) {
        String str = null;
        try{
            if(input.getSourceStreamId().equals("signals")){
                str = input.getStringByField("action");
                if("refreshCache".equals(str))
                    counters.clear();
            }
        }catch (IllegalArgumentException e){
            //什么也不做
        }
        ···
    }  

我們添加了一個(gè) if 分支,用來(lái)檢查源數(shù)據(jù)流。 Storm 允許我們聲明具名數(shù)據(jù)流(如果你不把元組發(fā)送到一個(gè)具名數(shù)據(jù)流,默認(rèn)發(fā)送到名為 ”default“ 的數(shù)據(jù)流)。這是一個(gè)識(shí)別元組的極好的方式,就像這個(gè)例子中,我們想識(shí)別 signals 一樣。 在拓?fù)涠x中,你要向 word-counter bolt 添加第二個(gè)數(shù)據(jù)流,用來(lái)接收從 signals-spout 數(shù)據(jù)流發(fā)送到所有 bolt 實(shí)例的每一個(gè)元組。

    builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");   

signals-spout的實(shí)現(xiàn)請(qǐng)參考git倉(cāng)庫(kù)。

自定義數(shù)據(jù)流組

你可以通過(guò)實(shí)現(xiàn) backtype.storm.grouping.CustormStreamGrouping 接口創(chuàng)建自定義數(shù)據(jù)流組,讓你自己決定哪些 bolt 接收哪些元組。

讓我們修改單詞計(jì)數(shù)器示例,使首字母相同的單詞由同一個(gè) bolt 接收。

    public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
        int numTasks = 0;

        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }

        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }  

這是一個(gè) CustomStreamGrouping 的簡(jiǎn)單實(shí)現(xiàn),在這里我們采用單詞首字母字符的整數(shù)值與任務(wù)數(shù)的余數(shù),決定接收元組的 bolt。

按下述方式 word-normalizer 修改即可使用這個(gè)自定義數(shù)據(jù)流組。

    builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());  

直接數(shù)據(jù)流組

這是一個(gè)特殊的數(shù)據(jù)流組,數(shù)據(jù)源可以用它決定哪個(gè)組件接收元組。與前面的例子類(lèi)似,數(shù)據(jù)源將根據(jù)單詞首字母決定由哪個(gè) bolt 接收元組。要使用直接數(shù)據(jù)流組,在 WordNormalizer bolt 中,使用 emitDirect 方法代替 emit。

    public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //對(duì)元組做出應(yīng)答
        collector.ack(input);
    }

    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }  

prepare 方法中計(jì)算任務(wù)數(shù)

    public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }  

在拓?fù)涠x中指定數(shù)據(jù)流將被直接分組:

    builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");  

全局?jǐn)?shù)據(jù)流組

全局?jǐn)?shù)據(jù)流組把所有數(shù)據(jù)源創(chuàng)建的元組發(fā)送給單一目標(biāo)實(shí)例(即擁有最低 ID 的任務(wù))。

不分組

寫(xiě)作本書(shū)時(shí)(Stom0.7.1 版),這個(gè)數(shù)據(jù)流組相當(dāng)于隨機(jī)數(shù)據(jù)流組。也就是說(shuō),使用這個(gè)數(shù)據(jù)流組時(shí),并不關(guān)心數(shù)據(jù)流是如何分組的。

LocalCluster VS StormSubmitter

到目前為止,你已經(jīng)用一個(gè)叫做 LocalCluster 的工具在你的本地機(jī)器上運(yùn)行了一個(gè)拓?fù)?。Storm 的基礎(chǔ)工具,使你能夠在自己的計(jì)算機(jī)上方便的運(yùn)行和調(diào)試不同的拓?fù)?。但是你怎么把自己的拓?fù)涮峤唤o運(yùn)行中的 Storm 集群呢?Storm 有一個(gè)有趣的功能,在一個(gè)真實(shí)的集群上運(yùn)行自己的拓?fù)涫呛苋菀椎氖虑?。要?shí)現(xiàn)這一點(diǎn),你需要把 LocalCluster 換成 StormSubmitter 并實(shí)現(xiàn) submitTopology 方法, 它負(fù)責(zé)把拓?fù)浒l(fā)送給集群。

下面是修改后的代碼:

    //LocalCluster cluster = new LocalCluster();
    //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, 
    //builder.createTopology());
    StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf,
            builder.createTopology());
    //Thread.sleep(1000);
    //cluster.shutdown();  

NOTE: 當(dāng)你使用 StormSubmitter 時(shí),你就不能像使用 LocalCluster 時(shí)一樣通過(guò)代碼控制集群了。

接下來(lái),把源碼壓縮成一個(gè) jar 包,運(yùn)行 Storm 客戶(hù)端命令,把拓?fù)涮峤唤o集群。如果你已經(jīng)使用了 Maven, 你只需要在命令行進(jìn)入源碼目錄運(yùn)行:mvn package。

現(xiàn)在你生成了一個(gè) jar 包,使用 storm jar 命令提交拓?fù)洌P(guān)于如何安裝 Storm 客戶(hù)端請(qǐng)參考附錄 A )。命令格式:storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

對(duì)于這個(gè)例子,在拓?fù)涔こ棠夸浵旅孢\(yùn)行:

storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt  

通過(guò)這些命令,你就把拓?fù)浒l(fā)布集群上了。

如果想停止或殺死它,運(yùn)行:

storm kill Count-Word-Topology-With-Refresh-Cache  

NOTE:拓?fù)涿Q(chēng)必須保證惟一性。

NOTE:如何安裝Storm客戶(hù)端,參考附錄A

DRPC 拓?fù)?/h2>

有一種特殊的拓?fù)漕?lèi)型叫做分布式遠(yuǎn)程過(guò)程調(diào)用(DRPC),它利用 Storm 的分布式特性執(zhí)行遠(yuǎn)程過(guò)程調(diào)用(RPC)(見(jiàn)下圖)。Storm 提供了一些用來(lái)實(shí)現(xiàn) DRPC 的工具。第一個(gè)是 DRPC 服務(wù)器,它就像是客戶(hù)端和 Storm 拓?fù)渲g的連接器,作為拓?fù)涞?spout 的數(shù)據(jù)源。它接收一個(gè)待執(zhí)行的函數(shù)和函數(shù)參數(shù),然后對(duì)于函數(shù)操作的每一個(gè)數(shù)據(jù)塊,這個(gè)服務(wù)器都會(huì)通過(guò)拓?fù)浞峙湟粋€(gè)請(qǐng)求 ID 用來(lái)識(shí)別 RPC 請(qǐng)求。拓?fù)鋱?zhí)行最后的 bolt 時(shí),它必須分配 RPC 請(qǐng)求 ID 和結(jié)果,使 DRPC 服務(wù)器把結(jié)果返回正確的客戶(hù)端。

NOTE:?jiǎn)螌?shí)例 DRPC 服務(wù)器能夠執(zhí)行許多函數(shù)。每個(gè)函數(shù)由一個(gè)惟一的名稱(chēng)標(biāo)識(shí)。

Storm 提供的第二個(gè)工具(已在例子中用過(guò))是 LineDRPCTopologyBuilder**,一個(gè)輔助構(gòu)建DRPC 拓?fù)涞某橄蟾拍?。生成的拓?fù)鋭?chuàng)建 DRPCSpouts ——它連接到 DRPC 服務(wù)器并向拓?fù)涞钠渌糠址职l(fā)數(shù)據(jù)——并包裝 bolts,使結(jié)果從最后一個(gè) bolt 返回。依次執(zhí)行所有添加到LinearDRPCTopologyBuilder* 對(duì)象的 bolts*。

作為這種類(lèi)型的拓?fù)涞囊粋€(gè)例子,我們創(chuàng)建了一個(gè)執(zhí)行加法運(yùn)算的進(jìn)程。雖然這是一個(gè)簡(jiǎn)單的例子,但是這個(gè)概念可以擴(kuò)展到復(fù)雜的分布式計(jì)算。

bolt 按下面的方式聲明輸出:

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id","result"));
    }  

因?yàn)檫@是拓?fù)渲形┮坏?bolt,它必須發(fā)布 RPC ID 和結(jié)果。execute 方法負(fù)責(zé)執(zhí)行加法運(yùn)算。

    public void execute(Tuple input) {
        String[] numbers = input.getString(1).split("\\+");
        Integer added = 0;
        if(numbers.length<2){
            throw new InvalidParameterException("Should be at least 2 numbers");
        }
        for(String num : numbers){
            added += Integer.parseInt(num);
        }
        collector.emit(new Values(input.getValue(0),added));
    }  

包含加法 bolt 的拓?fù)涠x如下:

    public static void main(String[] args) {
        LocalDRPC drpc = new LocalDRPC();

        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
        builder.addBolt(AdderBolt(),2);

        Config conf = new Config();
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("drpcder-topology", conf,
            builder.createLocalTopology(drpc));
        String result = drpc.execute("add", "1+-1");
        checkResult(result,0);
        result = drpc.execute("add", "1+1+5+10");
        checkResult(result,17);

        cluster.shutdown();
        drpc.shutdown();
    }  

創(chuàng)建一個(gè) LocalDRPC 對(duì)象在本地運(yùn)行 DRPC 服務(wù)器。接下來(lái),創(chuàng)建一個(gè)拓?fù)錁?gòu)建器(譯者注:LineDRpctopologyBuilder 對(duì)象),把 bolt 添加到拓?fù)?。運(yùn)行 DRPC 對(duì)象(LocalDRPC 對(duì)象)的 execute 方法測(cè)試拓?fù)洹?/p>

NOTE:使用 DRPCClient 類(lèi)連接遠(yuǎn)程 DRPC 服務(wù)器。DRPC 服務(wù)器暴露了 Thrift API,因此可以跨語(yǔ)言編程;并且不論是在本地還是在遠(yuǎn)程運(yùn)行DRPC服務(wù)器,它們的 API 都是相同的。 對(duì)于采用 Storm 配置的 DRPC 配置參數(shù)的 Storm 集群,調(diào)用構(gòu)建器對(duì)象的createRemoteTopology 向 Storm 集群提交一個(gè)拓?fù)洌皇钦{(diào)用 createLocalTopology。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)