Storm 事務(wù)性拓?fù)?/h1>

2018-09-28 16:09 更新

事務(wù)性拓?fù)?/h2>

正如書中之前所提到的,使用 Storm 編程,可以通過調(diào)用 ack 和 fail 方法來確保一條消息的處理成功或失敗。不過當(dāng)元組被重發(fā)時(shí),會(huì)發(fā)生什么呢?你又該如何砍不會(huì)重復(fù)計(jì)算?

Storm0.7.0 實(shí)現(xiàn)了一個(gè)新特性——事務(wù)性拓?fù)洌@一特性使消息在語義上確保你可以安全的方式重發(fā)消息,并保證它們只會(huì)被處理一次。在不支持事務(wù)性拓?fù)涞那闆r下,你無法在準(zhǔn)確性,可擴(kuò)展性,以空錯(cuò)性上得到保證的前提下完成計(jì)算。

NOTE:事務(wù)性拓?fù)涫且粋€(gè)構(gòu)建于標(biāo)準(zhǔn) Storm spout 和 bolt 之上的抽象概念。

設(shè)計(jì)

在事務(wù)性拓?fù)渲?,Storm 以并行和順序處理混合的方式處理元組。spout 并行分批創(chuàng)建供 bolt 處理的元組(譯者注:下文將這種分批創(chuàng)建、分批處理的元組稱做批次)。其中一些 bolt 作為提交者以嚴(yán)格有序的方式提交處理過的批次。這意味著如果你有每批五個(gè)元組的兩個(gè)批次,將有兩個(gè)元組被 bolt 并行處理,但是直到提交者成功提交了第一個(gè)元組之后,才會(huì)提交第二個(gè)元組。
NOTE: 使用事務(wù)性拓?fù)鋾r(shí),數(shù)據(jù)源要能夠重發(fā)批次,有時(shí)候甚至要重復(fù)多次。因此確認(rèn)你的數(shù)據(jù)源——你連接到的那個(gè) spout ——具備這個(gè)能力。 這個(gè)過程可以被描述為兩個(gè)階段: 處理階段 純并行階段,許多批次同時(shí)處理。 提交階段 嚴(yán)格有序階段,直到批次一成功提交之后,才會(huì)提交批次二。 這兩個(gè)階段合起來稱為一個(gè) Storm 事務(wù)。 NOTE: Storm 使用 zookeeper 儲(chǔ)存事務(wù)元數(shù)據(jù),默認(rèn)情況下就是拓?fù)涫褂玫哪莻€(gè) zookeeper。你可以修改以下兩個(gè)配置參數(shù)鍵指定其它的 zookeeper——transactional.zookeeper.servers 和transactional.zookeeper.port。

事務(wù)實(shí)踐

下面我們要?jiǎng)?chuàng)建一個(gè) Twitter 分析工具來了解事務(wù)的工作方式。我們從一個(gè) Redis 數(shù)據(jù)庫讀取tweets,通過幾個(gè) bolt 處理它們,最后把結(jié)果保存在另一個(gè) Redis 數(shù)據(jù)庫的列表中。處理結(jié)果就是所有話題和它們的在 tweets 中出現(xiàn)的次數(shù)列表,所有用戶和他們?cè)?tweets 中出現(xiàn)的次數(shù)列表,還有一個(gè)包含發(fā)起話題和頻率的用戶列表。 這個(gè)工具的拓?fù)鋱D。

圖 拓?fù)涓庞[

正如你看到的,TweetsTransactionalSpout 會(huì)連接你的 tweet 數(shù)據(jù)庫并向拓?fù)浞职l(fā)批次。UserSplitterBoltHashTagSplitterBolt 兩個(gè) bolt,從 spout 接收元組。UserSplitterBolt 解析 tweets 并查找用戶——以 @ 開頭的單詞——然后把這些單詞分發(fā)到名為 users 的自定義數(shù)據(jù)流組。HashtagSplitterBolt 從 tweet 查找 # 開頭的單詞,并把它們分發(fā)到名為 hashtags 的自定義數(shù)據(jù)流組。第三個(gè) bolt,UserHashtagJoinBolt,接收前面提到的兩個(gè)數(shù)據(jù)流組,并計(jì)算具名用戶的一條 tweet 內(nèi)的話題數(shù)量。為了計(jì)數(shù)并分發(fā)計(jì)算結(jié)果,這是個(gè) BaseBatchBolt(稍后有更多介紹)。

最后一個(gè) bolt——RedisCommitterBolt—— 接收以上三個(gè) bolt 的數(shù)據(jù)流組。它為每樣?xùn)|西計(jì)數(shù),并在對(duì)一個(gè)批次完成處理時(shí),把所有結(jié)果保存到 redis。這是一種特殊的 bolt,叫做提交者,在本章后面做更多講解。

TransactionalTopologyBuilder 構(gòu)建拓?fù)?,代碼如下:

01
TransactionalTopologyBuilder builder=
02
    new TransactionalTopologyBuilder("test", "spout", new TweetsTransactionalSpout());
03

04
builder.setBolt("users-splitter", new UserSplitterBolt(), 4).shuffleGrouping("spout");
05
buildeer.setBolt("hashtag-splitter", new HashtagSplitterBolt(), 4).shuffleGrouping("spout");
06

07
builder.setBolt("users-hashtag-manager", new UserHashtagJoinBolt(), r)
08
       .fieldsGrouping("users-splitter", "users", new Fields("tweet_id"))
09
       .fieldsGrouping("hashtag-splitter", "hashtags", new Fields("tweet_id"));
10

11
builder.setBolt("redis-commiter", new RedisCommiterBolt())
12
       .globalGrouping("users-splitter", "users")
13
       .globalGrouping("hashtag-splitter", "hashtags")
14
       .globalGrouping("user-hashtag-merger");  

接下來就看看如何在一個(gè)事務(wù)性拓?fù)渲袑?shí)現(xiàn) spout。

Spout

一個(gè)事務(wù)性拓?fù)涞?spout 與標(biāo)準(zhǔn) spout 完全不同。

1
public class TweetsTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata>{  

正如你在這個(gè)類定義中看到的,TweetsTransactionalSpout 繼承了帶范型的BaseTransactionalSpout。指定的范型類型的對(duì)象是事務(wù)元數(shù)據(jù)集合。它將在后面的代碼中用于從數(shù)據(jù)源分發(fā)批次。

在這個(gè)例子中,TransactionMetadata 定義如下:

01
public class TransactionMetadata implements Serializable {
02
    private static final long serialVersionUID = 1L;
03
    long from;
04
    int quantity;
05

06
    public TransactionMetadata(long from, int quantity) {
07
        this.from = from;
08
        this.quantity = quantity;
09
    }
10
}  

該類的對(duì)象維護(hù)著兩個(gè)屬性 fromquantity,它們用來生成批次。

spout 的最后需要實(shí)現(xiàn)下面的三個(gè)方法:

01
@Override
02
public ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(
03
       Map conf, TopologyContext context) {
04
    return new TweetsTransactionalSpoutCoordinator();
05
}
06

07
@Override
08
public backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext contest) {
09
    return new TweetsTransactionalSpoutEmitter();
10
}
11

12
@Override
13
public void declareOutputFields(OuputFieldsDeclarer declarer) {
14
    declarer.declare(new Fields("txid", "tweet_id", "tweet"));
15
}  

getCoordinator 方法,告訴 Storm 用來協(xié)調(diào)生成批次的類。getEmitter,負(fù)責(zé)讀取批次并把它們分發(fā)到拓?fù)渲械臄?shù)據(jù)流組。最后,就像之前做過的,需要聲明要分發(fā)的域。

RQ 類

為了讓例子簡(jiǎn)單點(diǎn),我們決定用一個(gè)類封裝所有對(duì) Redis 的操作。

01
public class RQ {
02
    public static final String NEXT_READ = "NEXT_READ";
03
    public static final String NEXT_WRITE = "NEXT_WRITE";
04

05
    Jedis jedis;
06

07
    public RQ() {
08
        jedis = new Jedis("localhost");
09
    }
10

11
    public long getavailableToRead(long current) {
12
        return getNextWrite() - current;
13
    }
14

15
    public long getNextRead() {
16
        String sNextRead = jedis.get(NEXT_READ);
17
        if(sNextRead == null) {
18
            return 1;
19
        }
20
        return Long.valueOf(sNextRead);
21
    }
22

23
    public long getNextWrite() {
24
        return Long.valueOf(jedis.get(NEXT_WRITE));
25
    }
26

27
    public void close() {
28
        jedis.disconnect();
29
    }
30

31
    public void setNextRead(long nextRead) {
32
        jedis.set(NEXT_READ, ""+nextRead);
33
    }
34

35
    public List<String> getMessages(long from, int quantity) {
36
        String[] keys = new String[quantity];
37
        for (int i = 0; i < quantity; i++) {
38
            keys[i] = ""+(i+from);
39
        }
40
        return jedis.mget(keys);
41
    }
42
}  

仔細(xì)閱讀每個(gè)方法,確保自己理解了它們的用處。

協(xié)調(diào)者 Coordinator

下面是本例的協(xié)調(diào)者實(shí)現(xiàn)。

01
public static class TweetsTransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata> {
02
    TransactionMetadata lastTransactionMetadata;
03
    RQ rq = new RQ();
04
    long nextRead = 0;
05

06
    public TweetsTransactionalSpoutCoordinator() {
07
        nextRead = rq.getNextRead();
08
    }
09

10
    @Override
11
    public TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {
12
        long quantity = rq.getAvailableToRead(nextRead);
13
        quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;
14
        TransactionMetadata ret = new TransactionMetadata(nextRead, (int)quantity);
15
        nextRead += quantity;
16
        return ret;
17
    }
18

19
    @Override
20
    public boolean isReady() {
21
        return rq.getAvailableToRead(nextRead) > 0;
22
    }
23

24
    @Override
25
    public void close() {
26
        rq.close();
27
    }
28
}  

值得一提的是,在整個(gè)拓?fù)渲兄粫?huì)有一個(gè)提交者實(shí)例。創(chuàng)建提交者實(shí)例時(shí),它會(huì)從 redis 讀取一個(gè)從1開始的序列號(hào),這個(gè)序列號(hào)標(biāo)識(shí)要讀取的 tweet 下一條。

第一個(gè)方法是 isReady。在 initializeTransaction 之前調(diào)用它確認(rèn)數(shù)據(jù)源已就緒并可讀取。此方法應(yīng)當(dāng)相應(yīng)的返回 truefalse。在此例中,讀取 tweets 數(shù)量并與已讀數(shù)量比較。它們之間的不同就在于可讀 tweets 數(shù)。如果它大于0,就意味著還有 tweets 未讀。

最后,執(zhí)行 initializeTransaction。正如你看到的,它接收 txidprevMetadata作為參數(shù)。第一個(gè)參數(shù)是 Storm 生成的事務(wù) ID,作為批次的惟一性標(biāo)識(shí)。prevMetadata 是協(xié)調(diào)器生成的前一個(gè)事務(wù)元數(shù)據(jù)對(duì)象。

在這個(gè)例子中,首先確認(rèn)有多少 tweets 可讀。只要確認(rèn)了這一點(diǎn),就創(chuàng)建一個(gè)TransactionMetadata 對(duì)象,標(biāo)識(shí)讀取的第一個(gè) tweet(譯者注:對(duì)象屬性 from ),以及讀取的 tweets 數(shù)量(譯者注:對(duì)象屬性 quantity )。

元數(shù)據(jù)對(duì)象一經(jīng)返回,Storm 把它跟 txid 一起保存在 zookeeper。這樣就確保了一旦發(fā)生故障,Storm 可以利用分發(fā)器(譯者注:Emitter,見下文)重新發(fā)送批次。

Emitter

創(chuàng)建事務(wù)性 spout 的最后一步是實(shí)現(xiàn)分發(fā)器(Emitter)。實(shí)現(xiàn)如下:

01
public static class TweetsTransactionalSpoutEmitter implements ITransactionalSpout.Emitter<TransactionMetadata> {
02

03
</pre>
04
<pre>    RQ rq = new RQ();</pre>
05
<pre>    public TweetsTransactionalSpoutEmitter() {}</pre>
06
<pre>    @Override
07
    public void emitBatch(TransactionAttempt tx, TransactionMetadata coordinatorMeta, BatchOutputCollector collector) {
08
        rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);
09
        List<String> messages = rq.getMessages(coordinatorMeta.from, <span style="font-family: Georgia, 'Times New Roman', 'Bitstream Charter', Times, serif; font-size: 13px; line-height: 19px;">coordinatorMeta.quantity);
10
</span>        long tweetId = coordinatorMeta.from;
11
        for (String message : messages) {
12
            collector.emit(new Values(tx, ""+tweetId, message));
13
            tweetId++;
14
        }
15
    }
16

17
    @Override
18
    public void cleanupBefore(BigInteger txid) {}
19

20
    @Override
21
    public void close() {
22
        rq.close();
23
    }</pre>
24
<pre>
25
}  

分發(fā)器從數(shù)據(jù)源讀取數(shù)據(jù)并從數(shù)據(jù)流組發(fā)送數(shù)據(jù)。分發(fā)器應(yīng)當(dāng)問題能夠?yàn)橄嗤氖聞?wù) id 和事務(wù)元數(shù)據(jù)發(fā)送相同的批次。這樣,如果在處理批次的過程中發(fā)生了故障,Storm 就能夠利用分發(fā)器重復(fù)相同的事務(wù) id 和事務(wù)元數(shù)據(jù),并確保批次已經(jīng)重復(fù)過了。Storm 會(huì)在TransactionAttempt 對(duì)象里為嘗試次數(shù)增加計(jì)數(shù)(譯者注:attempt id )。這樣就能知道批次已經(jīng)重復(fù)過了。

在這里 emitBatch 是個(gè)重要方法。在這個(gè)方法中,使用傳入的元數(shù)據(jù)對(duì)象從 redis 得到tweets,同時(shí)增加 redis 維持的已讀 tweets 數(shù)。當(dāng)然它還會(huì)把讀到的 tweets 分發(fā)到拓?fù)洹?/p>

Bolts

首先看一下這個(gè)拓?fù)渲械臉?biāo)準(zhǔn) bolt:

01
public class UserSplitterBolt implements IBasicBolt{
02
    private static final long serialVersionUID = 1L;
03

04
    @Override
05
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
06
        declarer.declareStream("users", new Fields("txid","tweet_id","user"));
07
    }
08

09
    @Override
10
    public Map<String, Object> getComponentConfiguration() {
11
        return null;
12
    }
13

14
    @Override
15
    public void prepare(Map stormConf, TopologyContext context) {}
16

17
    @Override
18
    public void execute(Tuple input, BasicOutputCollector collector) {
19
        String tweet = input.getStringByField("tweet");
20
        String tweetId = input.getStringByField("tweet_id");
21
        StringTokenizer strTok = new StringTokenizer(tweet, " ");
22
        HashSet<String> users = new HashSet<String>();
23

24
        while(strTok.hasMoreTokens()) {
25
            String user = strTok.nextToken();
26

27
            //確保這是個(gè)真實(shí)的用戶,并且在這個(gè)tweet中沒有重復(fù)
28
            if(user.startsWith("@") && !users.contains(user)) {
29
                collector.emit("users", new Values(tx, tweetId, user));
30
                users.add(user);
31
            }
32
        }
33
    }
34

35
    @Override
36
    public void cleanup(){}
37
}  

正如本章前面提到的,UserSplitterBolt 接收元組,解析 tweet 文本,分發(fā) @ 開頭的單詞————tweeter 用戶。HashtagSplitterBolt 的實(shí)現(xiàn)也非常相似。

01
public class HashtagSplitterBolt implements IBasicBolt{
02
    private static final long serialVersionUID = 1L;
03

04
    @Override
05
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
06
        declarer.declareStream("hashtags", new Fields("txid","tweet_id","hashtag"));
07
    }
08

09
    @Override
10
    public Map<String, Object> getComponentConfiguration() {
11
        return null;
12
    }
13

14
    @Override
15
    public void prepare(Map stormConf, TopologyContext context) {}
16

17
    @Oerride
18
    public void execute(Tuple input, BasicOutputCollector collector) {
19
        String tweet = input.getStringByField("tweet");
20
        String tweetId = input.getStringByField("tweet_id");
21
        StringTokenizer strTok = new StringTokenizer(tweet, " ");
22
        TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");
23
        HashSet<String> words = new HashSet<String>();
24

25
        while(strTok.hasMoreTokens()) {
26
            String word = strTok.nextToken();
27

28
            if(word.startsWith("#") && !words.contains(word)){
29
                collector.emit("hashtags", new Values(tx, tweetId, word));
30
                words.add(word);
31
            }
32
        }
33
    }
34

35
    @Override
36
    public void cleanup(){}
37
}  

現(xiàn)在看看 UserHashTagJoinBolt 的實(shí)現(xiàn)。首先要注意的是它是一個(gè) BaseBatchBolt。這意味著,execute 方法會(huì)操作接收到的元組,但是不會(huì)分發(fā)新的元組。批次完成時(shí),Storm 會(huì)調(diào)用 finishBatch 方法。

01
public void execute(Tuple tuple) {
02
    String source = tuple.getSourceStreamId();
03
    String tweetId = tuple.getStringByField("tweet_id");
04

05
    if("hashtags".equals(source)) {
06
        String hashtag = tuple.getStringByField("hashtag");
07
        add(tweetHashtags, tweetId, hashtag);
08
    } else if("users".equals(source)) {
09
        String user = tuple.getStringByField("user");
10
        add(userTweets, user, tweetId);
11
    }
12
}  

既然要結(jié)合 tweet 中提到的用戶為出現(xiàn)的所有話題計(jì)數(shù),就需要加入前面的 bolts 創(chuàng)建的兩個(gè)數(shù)據(jù)流組。這件事要以批次為單位進(jìn)程,在批次處理完成時(shí),調(diào)用 finishBatch 方法。

01
@Override
02
public void finishBatch() {
03
    for(String user:userTweets.keySet()){
04
        Set<String> tweets = getUserTweets(user);
05
        HashMap<String, Integer> hashtagsCounter = new HashMap<String, Integer>();
06
        for(String tweet:tweets){
07
            Set<String> hashtags=getTweetHashtags(tweet);
08
            if(hashtags!=null){
09
                for(String hashtag:hashtags){
10
                    Integer count=hashtagsCounter.get(hashtag);
11
                    if(count==null){count=0;}
12
                    count++;
13
                    hashtagsCounter.put(hashtag,count);
14
                }
15
            }
16
        }
17
        for(String hashtag:hashtagsCounter.keySet()){
18
            int count=hashtagsCounter.get(hashtag);
19
            collector.emit(new Values(id,user,hashtag,count));
20
        }
21
    }
22
}  

這個(gè)方法計(jì)算每對(duì)用戶-話題出現(xiàn)的次數(shù),并為之生成和分發(fā)元組。

你可以在 GitHub 上找到并下載完整代碼。(譯者注:https://github.com/storm-book/examples-ch08-transactional-topologies 這個(gè)倉庫里沒有代碼,誰知道哪里有代碼麻煩說一聲。)

提交者 bolts

我們已經(jīng)學(xué)習(xí)了,批次通過協(xié)調(diào)器和分發(fā)器怎樣在拓?fù)渲袀鬟f。在拓?fù)渲?,這些批次中的元組以并行的,沒有特定次序的方式處理。

協(xié)調(diào)者 bolts 是一類特殊的批處理 bolts,它們實(shí)現(xiàn)了 IComh mitter 或者通過TransactionalTopologyBuilder 調(diào)用 setCommiterBolt 設(shè)置了提交者 bolt。它們與其它的批處理 bolts 最大的不同在于,提交者 bolts的finishBatch 方法在提交就緒時(shí)執(zhí)行。這一點(diǎn)發(fā)生在之前所有事務(wù)都已成功提交之后。另外,finishBatch 方法是順序執(zhí)行的。因此如果同時(shí)有事務(wù) ID1 和事務(wù) ID2 兩個(gè)事務(wù)同時(shí)執(zhí)行,只有在 ID1 沒有任何差錯(cuò)的執(zhí)行了 finishBatch 方法之后,ID2 才會(huì)執(zhí)行該方法。

下面是這個(gè)類的實(shí)現(xiàn)

01
public class RedisCommiterCommiterBolt extends BaseTransactionalBolt implements ICommitter {
02
    public static final String LAST_COMMITED_TRANSACTION_FIELD = "LAST_COMMIT";
03
    TransactionAttempt id;
04
    BatchOutputCollector collector;
05
    Jedis jedis;
06

07
    @Override
08
    public void prepare(Map conf, TopologyContext context,
09
                        BatchOutputCollector collector, TransactionAttempt id) {
10
        this.id = id;
11
        this.collector = collector;
12
        this.jedis = new Jedis("localhost");
13
    }
14

15
    HashMap<String, Long> hashtags = new HashMap<String,Long>();
16
    HashMap<String, Long> users = new HashMap<String, Long>();
17
    HashMap<String, Long> usersHashtags = new HashMap<String, Long>();
18

19
    private void count(HashMap<String, Long> map, String key, int count) {
20
        Long value = map.get(key);
21
        if(value == null){value = (long)0;}
22
        value += count;
23
        map.put(key,value);
24
    }
25

26
    @Override
27
    public void execute(Tuple tuple) {
28
        String origin = tuple. getSourceComponent();
29
        if("sers-splitter".equals(origin)) {
30
            String user = tuple.getStringByField("user");
31
            count(users, user, 1);
32
        } else if("hashtag-splitter".equals(origin)) {
33
            String hashtag = tuple.getStringByField("hashtag");
34
            count(hashtags, hashtag, 1);
35
        } else if("user-hashtag-merger".quals(origin)) {
36
            String hashtag = tuple.getStringByField("hashtag");
37
            String user = tuple.getStringByField("user");
38
            String key = user + ":" + hashtag;
39
            Integer count = tuple.getIntegerByField("count");
40
            count(usersHashtags, key, count);
41
        }
42
    }
43

44
    @Override
45
    public void finishBatch() {
46
        String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);
47
        String currentTransaction = ""+id.getTransactionId();
48

49
        if(currentTransaction.equals(lastCommitedTransaction)) {return;}
50

51
        Transaction multi = jedis.multi();
52

53
        multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);
54

55
        Set<String> keys = hashtags.keySet();
56
        for (String hashtag : keys) {
57
            Long count = hashtags.get(hashtag);
58
            multi.hincrBy("hashtags", hashtag, count);
59
        }
60

61
        keys = users.keySet();
62
        for (String user : keys) {
63
            Long count =users.get(user);
64
            multi.hincrBy("users",user,count);
65
        }
66

67
        keys = usersHashtags.keySet();
68
        for (String key : keys) {
69
            Long count = usersHashtags.get(key);
70
            multi.hincrBy("users_hashtags", key, count);
71
        }
72

73
        multi.exec();
74
    }
75

76
    @Override
77
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
78
}  

這個(gè)實(shí)現(xiàn)很簡(jiǎn)單,但是在 finishBatch 有一個(gè)細(xì)節(jié)。

1
...
2
multi.set(LAST_COMMITED_TRANSACTION_FIELD, currentTransaction);
3
...  

在這里向數(shù)據(jù)庫保存提交的最后一個(gè)事務(wù) ID。為什么要這樣做?記住,如果事務(wù)失敗了,Storm將會(huì)盡可能多的重復(fù)必要的次數(shù)。如果你不確定已經(jīng)處理了這個(gè)事務(wù),你就會(huì)多算,事務(wù)拓?fù)湟簿蜎]有用了。所以請(qǐng)記?。罕4孀詈筇峤坏氖聞?wù) ID,并在提交前檢查。

分區(qū)的事務(wù) Spouts

對(duì)一個(gè) spout 來說,從一個(gè)分區(qū)集合中讀取批次是很普通的。接著這個(gè)例子,你可能有很多redis 數(shù)據(jù)庫,而 tweets 可能會(huì)分別保存在這些 redis 數(shù)據(jù)庫里。通過實(shí)現(xiàn)IPartitionedTransactionalSpout,Storm 提供了一些工具用來管理每個(gè)分區(qū)的狀態(tài)并保證重播的能力。

下面我們修改 TweetsTransactionalSpout,使它可以處理數(shù)據(jù)分區(qū)。

首先,繼承 BasePartitionedTransactionalSpout,它實(shí)現(xiàn)了IPartitionedTransactionalSpout

1
public class TweetsPartitionedTransactionalSpout extends
2
       BasePartitionedTransactionalSpout<TransactionMetadata> {
3
...
4
}  

然后告訴 Storm 誰是你的協(xié)調(diào)器。

01
public static class TweetsPartitionedTransactionalCoordinator implements Coordinator {
02
    @Override
03
    public int numPartitions() {
04
        return 4;
05
    }
06

07
    @Override
08
    public boolean isReady() {
09
        return true;
10
    }
11

12
    @Override
13
    public void close() {}
14
}  

在這個(gè)例子里,協(xié)調(diào)器很簡(jiǎn)單。numPartitions 方法,告訴 Storm 一共有多少分區(qū)。而且你要注意,不要返回任何元數(shù)據(jù)。對(duì)于 IPartitionedTransactionalSpout,元數(shù)據(jù)由分發(fā)器直接管理。

下面是分發(fā)器的實(shí)現(xiàn):

01
public static class TweetsPartitionedTransactionalEmitter
02
       implements Emitter<TransactionMetadata> {
03
    PartitionedRQ rq = new ParttionedRQ();
04

05
    @Override
06
    public TransactionMetadata emitPartitionBatchNew(TransactionAttempt tx,
07
            BatchOutputCollector collector, int partition,
08
            TransactionMetadata lastPartitioonMeta) {
09
        long nextRead;
10

11
        if(lastPartitionMeta == null) {
12
            nextRead = rq.getNextRead(partition);
13
        }else{
14
            nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;
15
            rq.setNextRead(partition, nextRead); //移動(dòng)游標(biāo)
16
        }
17

18
        long quantity = rq.getAvailableToRead(partition, nextRead);
19
        quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;
20
        TransactionMetadata metadata = new TransactionMetadata(nextRead, (int)quantity);
21

22
        emitPartitionBatch(tx, collector, partition, metadata);
23
        return metadata;
24
    }
25

26
    @Override
27
    public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector,
28
            int partition, TransactionMetadata partitionMeta) {
29
        if(partitionMeta.quantity <= 0){
30
            return;
31
        }
32

33
        List<String> messages = rq.getMessages(partition, partitionMeta.from,
34
               partitionMeta.quantity);
35

36
        long tweetId = partitionMeta.from;
37
        for (String msg : messages) {
38
            collector.emit(new Values(tx, ""+tweetId, msg));
39
            tweetId++;
40
        }
41
    }
42

43
    @Override
44
    public void close() {}
45
}  

這里有兩個(gè)重要的方法,emitPartitionBatchNew,和 emitPartitionBatch。對(duì)于 emitPartitionBatchNew,從 Storm 接收分區(qū)參數(shù),該參數(shù)決定應(yīng)該從哪個(gè)分區(qū)讀取批次。在這個(gè)方法中,決定獲取哪些 tweets,生成相應(yīng)的元數(shù)據(jù)對(duì)象,調(diào)用 emitPartitionBatch,返回元數(shù)據(jù)對(duì)象,并且元數(shù)據(jù)對(duì)象會(huì)在方法返回時(shí)立即保存到 zookeeper。

Storm 會(huì)為每一個(gè)分區(qū)發(fā)送相同的事務(wù) ID,表示一個(gè)事務(wù)貫穿了所有數(shù)據(jù)分區(qū)。通過emitPartitionBatch 讀取分區(qū)中的 tweets,并向拓?fù)浞职l(fā)批次。如果批次處理失敗了,Storm 將會(huì)調(diào)用 emitPartitionBatch 利用保存下來的元數(shù)據(jù)重復(fù)這個(gè)批次。

NOTE: 完整的源碼請(qǐng)見https://github.com/storm-book/examples-ch08-transactional-topologies(譯者注:原文如此,實(shí)際上這個(gè)倉庫里什么也沒有)

模糊的事務(wù)性拓?fù)?/h2>

到目前為止,你可能已經(jīng)學(xué)會(huì)了如何讓擁有相同事務(wù) ID 的批次在出錯(cuò)時(shí)重播。但是在有些場(chǎng)景下這樣做可能就不太合適了。然后會(huì)發(fā)生什么呢?

事實(shí)證明,你仍然可以實(shí)現(xiàn)在語義上精確的事務(wù),不過這需要更多的開發(fā)工作,你要記錄由 Storm 重復(fù)的事務(wù)之前的狀態(tài)。既然能在不同時(shí)刻為相同的事務(wù) ID 得到不同的元組,你就需要把事務(wù)重置到之前的狀態(tài),并從那里繼續(xù)。

比如說,如果你為收到的所有 tweets 計(jì)數(shù),你已數(shù)到5,而最后的事務(wù) ID 是321,這時(shí)你多數(shù)了8個(gè)。你要維護(hù)以下三個(gè)值 ——previousCount=5,currentCount=13,以及l(fā)astTransactionId=321。假設(shè)事物 ID321 又發(fā)分了一次,而你又得到了4個(gè)元組,而不是之前的8個(gè),提交器會(huì)探測(cè)到這是相同的事務(wù) ID,它將會(huì)把結(jié)果重置到 previousCount 的值5,并在此基礎(chǔ)上加4,然后更新 currentCount 為9。

另外,在之前的一個(gè)事務(wù)被取消時(shí),每個(gè)并行處理的事務(wù)都要被取消。這是為了確保你沒有丟失任何數(shù)據(jù)。

你的 spout 可以實(shí)現(xiàn) IOpaquePartitionedTransactionalSpout,而且正如你看到的,協(xié)調(diào)器和分發(fā)器也很簡(jiǎn)單。

01
public static class TweetsOpaquePartitionedTransactionalSpoutCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator {
02
    @Override
03
    public boolean isReady() {
04
        return true;
05
    }
06
}
07

08
public static class TweetsOpaquePartitionedTransactionalSpoutEmitter
09
       implements IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata> {
10
    PartitionedRQ rq  = new PartitionedRQ();
11

12
    @Override
13
    public TransactionMetadata emitPartitionBatch(TransactionAttempt tx,
14
           BatchOutputCollector collector, int partion,
15
           TransactionMetadata lastPartitonMeta) {
16
        long nextRead;
17

18
        if(lastPartitionMeta == null) {
19
            nextRead = rq.getNextRead(partition);
20
        }else{
21
            nextRead = lastPartitionMeta.from + lastPartitionMeta.quantity;
22
            rq.setNextRead(partition, nextRead);//移動(dòng)游標(biāo)
23
        }
24

25
        long quantity = rq.getAvailabletoRead(partition, nextRead);
26
        quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;
27
        TransactionMetadata metadata = new TransactionMetadata(nextRead, (int)quantity);
28
        emitMessages(tx, collector, partition, metadata);
29
        return metadata;
30
    }
31

32
    private void emitMessage(TransactionAttempt tx, BatchOutputCollector collector,
33
                 int partition, TransactionMetadata partitionMeta) {
34
        if(partitionMeta.quantity <= 0){return;}
35

36
        List<String> messages = rq.getMessages(partition, partitionMeta.from, partitionMeta.quantity);
37
        long tweetId = partitionMeta.from;
38
        for(String msg : messages) {
39
            collector.emit(new Values(tx, ""+tweetId, msg));
40
            tweetId++;
41
        }
42
    }
43

44
    @Override
45
    public int numPartitions() {
46
        return 4;
47
    }
48

49
    @Override
50
    public void close() {}
51
}  

最有趣的方法是 emitPartitionBatch,它獲取之前提交的元數(shù)據(jù)。你要用它生成批次。這個(gè)批次不需要與之前的那個(gè)一致,你可能根本無法創(chuàng)建完全一樣的批次。剩余的工作由提交器 bolts借助之前的狀態(tài)完成。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)