App下載

在Kafka Connect:處理更新和刪除的策略分享!

認證小可愛 2021-09-09 17:21:10 瀏覽數(shù) (4164)
反饋

Kafka Connect 是一款出色的工具,可讓您輕松設置從一個數(shù)據(jù)源到目標數(shù)據(jù)庫的連續(xù)數(shù)據(jù)流。它的配置非常簡單,當您有遺留系統(tǒng)為您需要的業(yè)務數(shù)據(jù)提供服務時,出于某種原因或其他原因,它在不同的地方非常有用。我的典型用例是將數(shù)據(jù)從 Oracle 表移動到微服務使用的 MongoDB 集合。這允許更好的可擴展性,因為我們不必使用生產(chǎn)查詢大量訪問源表。

當您打開 Kafka Connect 手冊時,不容易解釋的一件事是如何處理修改已移動的現(xiàn)有數(shù)據(jù)的操作;或者換句話說,更新和刪除。我認為這是我們使用的典型 JDBC/MongoDB 連接器對的限制。有一段時間我探索了 Debezium 連接器,它承諾捕獲這些類型的事件并將它們復制到目標數(shù)據(jù)庫中。使用 OracleDB 的 POC 對我們來說并不成功。我們對這些數(shù)據(jù)庫的訪問有限,而且這些連接器所需的配置級別并不是一個簡單的解決方案。

當我們繼續(xù)使用連接器時,我們發(fā)現(xiàn)有一些方法可以處理這些場景。我將解釋兩種策略。第一個是最理想的,需要在我們的源數(shù)據(jù)庫中進行特定設計。如果該設計不存在且因任何原因無法更改,則第二個是替代解決方案。

基本示例

假設我們有一個處理促銷活動的舊系統(tǒng)。為了簡化我們的示例,假設我們有一個包含三列的基本表。我們需要不斷地將這些數(shù)據(jù)從 SQL 數(shù)據(jù)庫移動到基于文檔的數(shù)據(jù)庫,如 MongoDB。 

基本的三列表

基本概念

首先,我們需要對可以使用的兩種 Kafka 連接器進行快速描述:增量和批量。嚴格來說,JDBC連接器有四種模式:bulk、timestamp、incrementing、timestamp+incrementing。我將最后三個分組為增量,因為它們共享相同的基本概念。您只想移動從源中檢測到的新數(shù)據(jù)。

批量連接器始終移動整個數(shù)據(jù)集。但是,很大程度上取決于我們正在移動的數(shù)據(jù)的用例。理想情況下,增量連接器是最好的解決方案,因為在資源使用或數(shù)據(jù)準備方面更容易管理小塊新數(shù)據(jù)。這里的問題是:Kafka Connect 如何使用純 SQL 查詢,以及它如何知道何時在源中插入了新數(shù)據(jù)?

源連接器配置可以使用以下兩個屬性之一(或兩者):incrementing.column.name 和 timestamp.column.name。Incrementing 屬性使用增量列(如自動生成的 id)來檢測何時插入新行。Timestamp 屬性使用 DateTime 列來檢測新更改。Kafka Connect 持有一個偏移量,將其附加到用于從源獲取數(shù)據(jù)的 SQL 查詢中。

例如,如果我們的表名為“promotions”,我們將在源連接器的查詢屬性中使用,如下所示:

"query": "SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS",
"timestamp.column.name": "LAST_UPDATE_DATE"

Kafka 內部將查詢修改為如下所示:

SELECT  * FROM ( SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS) 
WHERE LAST_UPDATE_DATE > {OFFSET_DATE}

在接收器連接器端,即在目標數(shù)據(jù)庫中保存數(shù)據(jù)的連接器,我們需要設置一個策略來根據(jù) ID 進行正確的 upsert。您可以在您使用的接收器連接器的文檔中閱讀更多相關信息。對于 MongoDB 連接器,我使用的典型設置是:

"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy",

這表明我們文檔的 _id 將來自源數(shù)據(jù)。在這種情況下,我們的源查詢應該包含一個 _id 列:

"query": "SELECT  PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS"

至此,我們有了檢測新插入的基本配置。每次添加帶有新時間戳的新促銷時,源連接器都會抓取它并將其移動到所需的目的地。但是有了這個完全相同的配置,我們就可以實現(xiàn)檢測更新和刪除的總目標。我們需要的是正確設計我們的數(shù)據(jù)源。

在每次更新時修改時間戳列

如果我們想確保我們的更新被處理并反映在目標數(shù)據(jù)庫中,我們需要確保在源表中進行的每個更新也更新時間戳列值。這可以通過寫入它的應用程序將當前時間戳作為更新操作的參數(shù)來完成,或者創(chuàng)建一個監(jiān)聽更新事件的觸發(fā)器。由于 sink 連接器根據(jù) id 處理 upsert,更新也會反映在目標文檔中。

軟刪除

為了能夠處理刪除,我們需要前面的步驟以及數(shù)據(jù)庫設計中被認為是好的做法:軟刪除。這種做法是在需要時不刪除(硬刪除)數(shù)據(jù)庫中的記錄,而只是用一個特殊的標志來標記它,表明該記錄不再有效/活動。這在可恢復性或審計方面有其自身的好處。這當然意味著我們的應用程序或存儲過程需要了解這種設計并在查詢數(shù)據(jù)時過濾掉不活動的記錄。

如果很難更新刪除記錄的應用程序來進行軟刪除(以防數(shù)據(jù)源的設計沒有考慮到這一點),我們還可以使用觸發(fā)器來捕獲硬刪除并改為進行軟刪除。 

為了我們的 Kafka Connect 目的,我們需要做的是在記錄被標記為非活動時更改我們的時間戳列值。在此示例中,我們將 HOT SUMMER 促銷設置為非活動,將 ACTIVE 列設置為 0。LAST_UPDATE_DATE 還修改為最近的日期,這將使源連接器獲取記錄。

其他樣品表

當數(shù)據(jù)被移動時,例如移動到 MongoDB,為了使用它,我們還需要根據(jù)這個 ACTIVE 字段進行過濾:

db.getCollection('promotions').find({active: 1})

版本化批量

如果我們必須處理不可更改的設計,則可以使用的最后一種方法選項不允許修改源模式以具有時間戳列或活動標志。這個選項有我所說的版本化批量。正如我之前所解釋的,每次調用時,批量連接器都會移動整個數(shù)據(jù)集。在大多數(shù)情況下,我遇到過增量更新總是更可取的做法,但在這種情況下,我們可以利用批量選項。
由于我們需要跟蹤新插入、更新或刪除的內容,因此我們可以每次移動數(shù)據(jù),添加一個額外的列來標識數(shù)據(jù)的快照。我們還可以使用查詢數(shù)據(jù)時的時間戳。由于時間戳是自然后代排序的值,如果我們想要最新的快照,我們可以很容易地通過最后一個或倒數(shù)第二個(我將解釋為什么這可能更好)一旦數(shù)據(jù)移動到目標位置的快照進行過濾。
Oracle 中的查詢如下所示:

"query": "SELECT  PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY, 
TO_CHAR(SYSDATE, 'yyyymmddhh24miss') AS SNAPSHOT FROM PROMOTIONS"

這種方法需要一些配置,這些配置對于使用最終數(shù)據(jù)集時的正確性能至關重要。您可以想象,索引在這里很重要,更重要的是,在新的快照列中。另一個重要的考慮因素是消耗的空間。根據(jù)每個快照中的記錄數(shù)量,我們可能需要刪除舊版本。我們可以為此使用一些計劃任務,或者像使用 MongoDB 索引一樣配置 TTL。
在使用數(shù)據(jù)時,我們首先需要獲取最新的快照。我提到倒數(shù)第二個可能更好。原因是最新的可能是正在進行的。換句話說,當您執(zhí)行查詢以使用數(shù)據(jù)時,數(shù)據(jù)可能會移動。如果您對目標數(shù)據(jù)庫的查詢是任何類型的聚合,您可能會得到不完整的結果。因此,對于最新的快照,我們不確定它是否處于準備好使用的狀態(tài)。如果我們抓取倒數(shù)第二個,我們可以確定快照是完整的。
在下一個示例中,移動了數(shù)據(jù)的兩個版本。版本 2021073012000包含三個文檔。較新的版本2021080112000有兩個文檔,一個文檔有折扣的更新版本。如您所見,每個版本都是數(shù)據(jù)源的時間快照。
數(shù)據(jù)源時間快照

這種方法有點棘手,不應該是我們的第一選擇。


0 人點贊