你將在本章了解到 spout 作為拓撲入口和它的容錯機制相關的最常見的設計策略。
在設計拓撲結構時,始終在頭腦中記著的一件重要事情就是消息的可靠性。當有無法處理的消息時,你就要決定該怎么辦,以及作為一個整體的拓撲結構該做些什么。舉個例子,在處理銀行存款時,不要丟失任何事務報文就是很重要的事情。但是如果你要統(tǒng)計分析數(shù)以百萬的 tweeter 消息,即使有一條丟失了,仍然可以認為你的結果是準確的。
對于 Storm 來說,根據(jù)每個拓撲的需要擔保消息的可靠性是開發(fā)者的責任。這就涉及到消息可靠性和資源消耗之間的權衡。高可靠性的拓撲必須管理丟失的消息,必然消耗更多資源;可靠性較低的拓撲可能會丟失一些消息,占用的資源也相應更少。不論選擇什么樣的可靠性策略,Storm 都提供了不同的工具來實現(xiàn)它。
要在 spout 中管理可靠性,你可以在分發(fā)時包含一個元組的消息 ID(collector.emit(new Values(…),tupleId))。在一個元組被正確的處理時調(diào)用 ack** 方法,而在失敗時調(diào)用 fail** 方法。當一個元組被所有的靶 bolt 和錨 bolt 處理過,即可判定元組處理成功(你將在第5章學到更多錨 bolt 知識)。
發(fā)生下列情況之一時為元組處理失?。?/p>
讓我們來看一個例子。想象你正在處理銀行事務,需求如下:
創(chuàng)建一個 spout 和一個 bolt,spout 隨機發(fā)送100個事務 ID,有80%的元組不會被 bolt 收到(你可以在例子 ch04-spout 查看完整代碼)。實現(xiàn) spout 時利用 Map 分發(fā)事務消息元組,這樣就比較容易實現(xiàn)重發(fā)消息。
public void nextTuple() {
if(!toSend.isEmpty()){
for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){
Integer transactionId = transactionEntry.getKey();
String transactionMessage = transactionEntry.getValue();
collector.emit(new Values(transactionMessage),transactionId);
}
toSend.clear();
}
}
如果有未發(fā)送的消息,得到每條事務消息和它的關聯(lián) ID,把它們作為一個元組發(fā)送出去,最后清空消息隊列。值得一提的是,調(diào)用 map 的 clear 是安全的,因為 nextTuple 失敗時,只有 ack 方法會修改 map,而它們都運行在一個線程內(nèi)。
維護兩個 map 用來跟蹤待發(fā)送的事務消息和每個事務的失敗次數(shù)。ack 方法只是簡單的把事務從每個列表中刪除。
public void ack(Object msgId) {
messages.remove(msgId);
failCounterMessages.remove(msgId);
}
fail 方法決定應該重新發(fā)送一條消息,還是已經(jīng)失敗太多次而放棄它。
NOTE:如果你使用全部數(shù)據(jù)流組,而拓撲里的所有 bolt 都失敗了,spout 的 fail 方法才會被調(diào)用。
public void fail(Object msgId) {
Integer transactionId = (Integer) msgId;
//檢查事務失敗次數(shù)
Integer failures = transactionFailureCount.get(transactionId) + 1;
if(failes >= MAX_FAILS){
//失敗數(shù)太高了,終止拓撲
throw new RuntimeException("錯誤, transaction id 【"+
transactionId+"】 已失敗太多次了 【"+failures+"】");
}
//失敗次數(shù)沒有達到最大數(shù),保存這個數(shù)字并重發(fā)此消息
transactionFailureCount.put(transactionId, failures);
toSend.put(transactionId, messages.get(transactionId));
LOG.info("重發(fā)消息【"+msgId+"】");
}
首先,檢查事務失敗次數(shù)。如果一個事務失敗次數(shù)太多,通過拋出 RuntimeException 終止發(fā)送此條消息的工人。否則,保存失敗次數(shù),并把消息放入待發(fā)送隊列(toSend),它就會再次調(diào)用 nextTuple 時得以重新發(fā)送。
NOTE:Storm 節(jié)點不維護狀態(tài),因此如果你在內(nèi)存保存信息(就像本例做的那樣),而節(jié)點又不幸掛了,你就會丟失所有緩存的消息。Storm 是一個快速失敗的系統(tǒng)。拓撲會在拋出異常時掛掉,然后再由 Storm 重啟,恢復到拋出異常前的狀態(tài)。
接下來你會了解到一些設計 spout 的技巧,幫助你從多數(shù)據(jù)源獲取數(shù)據(jù)。
在一個直接連接的架構中,spout 直接與一個消息分發(fā)器連接。
圖 直接連接的 spout
這個架構很容易實現(xiàn),尤其是在消息分發(fā)器是已知設備或已知設備組時。已知設備滿足:拓撲從啟動時就已知道該設備,并貫穿拓撲的整個生命周期保持不變。未知設備就是在拓撲運行期添加進來的。已知設備組就是從拓撲啟動時組內(nèi)所有設備都是已知的。
下面舉個例子說明這一點。創(chuàng)建一個 spout 使用 Twitter 流 API 讀取 twitter 數(shù)據(jù)流。spout 把 API 當作消息分發(fā)器直接連接。從數(shù)據(jù)流中得到符合 track 參數(shù)的公共 tweets(參考 twitter 開發(fā)頁面)。完整的例子可以在鏈接 https://github.com/storm-book/examples-ch04-spouts/找到。
spout 從配置對象得到連接參數(shù)(track,user,password),并連接到 API(在這個例子中使用 Apache 的 DefaultHttpClient)。它一次讀一行數(shù)據(jù),并把數(shù)據(jù)從 JSON 轉化成 Java 對象,然后發(fā)布它。
public void nextTuple() {
//創(chuàng)建http客戶端
client = new DefaultHttpClient();
client.setCredentialsProvider(credentialProvider);
HttpGet get = new HttpGet(STREAMING_API_URL+track);
HttpResponse response;
try {
//執(zhí)行http訪問
response = client.execute(get);
StatusLine status = response.getStatusLine();
if(status.getStatusCode() == 200){
InputStream inputStream = response.getEntity().getContent();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String in;
//逐行讀取數(shù)據(jù)
while((in = reader.readLine())!=null){
try{
//轉化并發(fā)布消息
Object json = jsonParser.parse(in);
collector.emit(new Values(track,json));
}catch (ParseException e) {
LOG.error("Error parsing message from twitter",e);
}
}
}
} catch (IOException e) {
LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"],
sleeping 10s");
try {
Thread.sleep(10000);
} catch (InterruptedException e1) {}
}
}
NOTE:在這里你鎖定了 nextTuple 方法,所以你永遠也不會執(zhí)行 ack** 和 fail** 方法。在真實的應用中,我們推薦你在一個單獨的線程中執(zhí)行鎖定,并維持一個內(nèi)部隊列用來交換數(shù)據(jù)(你會在下一個例子中學到如何實現(xiàn)這一點:消息隊列)。
棒極了!現(xiàn)在你用一個 spout 讀取 Twitter 數(shù)據(jù)。一個明智的做法是,采用拓撲并行化,多個 spout 從同一個流讀取數(shù)據(jù)的不同部分。那么如果你有多個流要讀取,你該怎么做呢?Storm 的第二個有趣的特性(譯者注:第一個有趣的特性已經(jīng)出現(xiàn)過,這句話原文都是一樣的,不過按照中文的行文習慣還是不重復使用措詞了)是,你可以在任意組件內(nèi)(spouts/bolts)訪問TopologyContext。利用這一特性,你能夠把流劃分到多個 spouts 讀取。
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
//從context對象獲取spout大小
int spoutsSize =
context.getComponentTasks(context.getThisComponentId()).size();
//從這個spout得到任務id
int myIdx = context.getThisTaskIndex();
String[] tracks = ((String) conf.get("track")).split(",");
StringBuffer tracksBuffer = new StringBuffer();
for(int i=0; i< tracks.length;i++){
//Check if this spout must read the track word
if( i % spoutsSize == myIdx){
tracksBuffer.append(",");
tracksBuffer.append(tracks[i]);
}
}
if(tracksBuffer.length() == 0) {
throw new RuntimeException("沒有為spout得到track配置" +
" [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的數(shù)量必須高于spout的數(shù)量");
this.track =tracksBuffer.substring(1).toString();
}
...
}
利用這一技巧,你可以把 collector 對象均勻的分配給多個數(shù)據(jù)源,當然也可以應用到其它的情形。比如說,從web服務器收集日志文件
圖 直連 hash
通過上一個例子,你學會了從一個 spout 連接到已知設備。你也可以使用相同的方法連接未知設備,不過這時你需要借助于一個協(xié)同系統(tǒng)維護的設備列表。協(xié)同系統(tǒng)負責探察列表的變化,并根據(jù)變化創(chuàng)建或銷毀連接。比如,從 web 服務器收集日志文件時,web 服務器列表可能隨著時間變化。當添加一臺 web 服務器時,協(xié)同系統(tǒng)探查到變化并為它創(chuàng)建一個新的 spout。
圖 直連協(xié)同
第二種方法是,通過一個隊列系統(tǒng)接收來自消息分發(fā)器的消息,并把消息轉發(fā)給 spout。更進一步的做法是,把隊列系統(tǒng)作為 spout 和數(shù)據(jù)源之間的中間件,在許多情況下,你可以利用多隊列系統(tǒng)的重播能力增強隊列可靠性。這意味著你不需要知道有關消息分發(fā)器的任何事情,而且添加或移除分發(fā)器的操作比直接連接簡單的多。這個架構的問題在于隊列是一個故障點,另外你還要為處理流程引入新的環(huán)節(jié)。
下圖展示了這一架構模型
圖 使用隊列系統(tǒng)
NOTE:你可以通過輪詢隊列或哈希隊列(把隊列消息通過哈希發(fā)送給 spouts 或創(chuàng)建多個隊列使隊列 spouts 一一對應)在多個 spouts 之間實現(xiàn)并行性。
接下來我們利用 Redishttp://redis.io/ 和它的 java 庫 Jedis 創(chuàng)建一個隊列系統(tǒng)。在這個例子中,我們創(chuàng)建一個日志處理器從一個未知的來源收集日志,利用 lpush 命令把消息插入隊列,利用 blpop 命令等待消息。如果你有很多處理過程,blpop 命令采用了輪詢方式獲取消息。
我們在 spout 的 open** 方法創(chuàng)建一個線程,用來獲取消息(使用線程是為了避免鎖定nextTuple** 在主循環(huán)的調(diào)用):
new Thread(new Runnable() {
@Override
public void run() {
try{
Jedis client= new Jedis(redisHost, redisPort);
List res = client.blpop(Integer.MAX_VALUE, queues);
messages.offer(res.get(1));
}catch(Exception e){
LOG.error("從redis讀取隊列出錯",e);
try {
Thread.sleep(100);
}catch(InterruptedException e1){}
}
}
}).start();
這個線程的惟一目的就是,創(chuàng)建 redis 連接,然后執(zhí)行 blpop 命令。每當收到了一個消息,它就被添加到一個內(nèi)部消息隊列,然后會被 nextTuple**** 消費。對于 spout 來說數(shù)據(jù)源就是 redis 隊列,它不知道消息分發(fā)者在哪里也不知道消息的數(shù)量。
NOTE:我們不推薦你在 spout 創(chuàng)建太多線程,因為每個 spout 都運行在不同的線程。一個更好的替代方案是增加拓撲并行性,也就是通過 Storm 集群在分布式環(huán)境創(chuàng)建更多線程。
在 nextTuple 方法中,要做的惟一的事情就是從內(nèi)部消息隊列獲取消息并再次分發(fā)它們。
public void nextTuple(){
while(!messages.isEmpty()){
collector.emit(new Values(messages.poll()));
}
}
NOTE:你還可以借助 redis 在 spout 實現(xiàn)消息重發(fā),從而實現(xiàn)可靠的拓撲。(譯者注:這里是相對于開頭的可靠的消息VS不可靠的消息講的)
DRPCSpout從DRPC 服務器接收一個函數(shù)調(diào)用,并執(zhí)行它(見第三章的例子)。對于最常見的情況,使用 backtype.storm.drpc.DRPCSpout 就足夠了,不過仍然有可能利用 Storm 包內(nèi)的DRPC類創(chuàng)建自己的實現(xiàn)。
現(xiàn)在你已經(jīng)學習了常見的spout實現(xiàn)模式,它們的優(yōu)勢,以及如何確保消息可靠性。不存在適用于所有拓撲的架構模式。如果你知道數(shù)據(jù)源,并且能夠控制它們,你就可以使用直接連接;然而如果你需要添加未知數(shù)據(jù)源或從多種數(shù)據(jù)源接收數(shù)據(jù),就最好使用消息隊列。如果你要執(zhí)行在線過程,你可以使用 DRPCSpout 或類似的實現(xiàn)。
你已經(jīng)學習了三種常見連接方式,不過依賴于你的需求仍然有無限的可能。
更多建議: