作者:kyledong
來源:微信公眾號:騰訊云大數(shù)據(jù)
出處:http://mp.weixin.qq.com/s?__biz=MzUzNTc0NTcyMw==&mid=2247485657&idx=1&sn=f6fa360a7c35cb6f03e78222ca8691cb
CDC 變更數(shù)據(jù)捕獲技術(shù) 可以將 源數(shù)據(jù)庫的增量變動記錄,同步到一個或多個數(shù)據(jù)目的。本文基于 騰訊云 O ceanus 提供的 Flink CDC 引擎, 著重介紹 Flink 在變更數(shù)據(jù)捕獲技術(shù)中的應(yīng)用。
一、CDC 是什么?
CDC 是變更數(shù)據(jù)捕獲(Change Data Capture)技術(shù)的縮寫,它可以將源數(shù)據(jù)庫(Source)的增量變動記錄,同步到一個或多個數(shù)據(jù)目的(Sink)。在同步過程中,還可以對數(shù)據(jù)進(jìn)行一定的處理,例如分組(GROUP BY)、多表的關(guān)聯(lián)(JOIN)等。
例如對于電商平臺,用戶的訂單會實(shí)時寫入到某個源數(shù)據(jù)庫;A 部門需要將每分鐘的實(shí)時數(shù)據(jù)簡單聚合處理后保存到 Redis 中以供查詢,B 部門需要將當(dāng)天的數(shù)據(jù)暫存到 Elasticsearch 一份來做報(bào)表展示,C 部門也需要一份數(shù)據(jù)到 ClickHouse 做實(shí)時數(shù)倉。隨著時間的推移,后續(xù) D 部門、E 部門也會有數(shù)據(jù)分析的需求,這種場景下,傳統(tǒng)的拷貝分發(fā)多個副本方法很不靈活,而 CDC 可以實(shí)現(xiàn)一份變動記錄,實(shí)時處理并投遞到多個目的地。
下圖是一個示例,通過騰訊云 Oceanus 提供的 Flink CDC 引擎,可以將某個 MySQL 的數(shù)據(jù)庫表的變動記錄,實(shí)時同步到下游的 Redis、Elasticsearch、ClickHouse 等多個接收端。這樣大家可以各自分析自己的數(shù)據(jù)集,互不影響,同時又和上游數(shù)據(jù)保持實(shí)時的同步。

二、CDC 的實(shí)現(xiàn)原理
通常來講,CDC 分為 主動查詢 和 事件接收 兩種技術(shù)實(shí)現(xiàn)模式。
對于主動查詢而言,用戶通常會在數(shù)據(jù)源表的某個字段中,保存上次更新的時間戳或版本號等信息,然后下游通過不斷的查詢和與上次的記錄做對比,來確定數(shù)據(jù)是否有變動,是否需要同步。這種方式優(yōu)點(diǎn)是不涉及數(shù)據(jù)庫底層特性,實(shí)現(xiàn)比較通用;缺點(diǎn)是要對業(yè)務(wù)表做改造,且實(shí)時性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對數(shù)據(jù)庫的壓力較大。
事件接收模式可以通過觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來實(shí)現(xiàn)。當(dāng)數(shù)據(jù)源表發(fā)生變動時,會通過附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來。下游可以通過數(shù)據(jù)庫底層的協(xié)議,訂閱并消費(fèi)這些事件,然后對數(shù)據(jù)庫變動記錄做重放,從而實(shí)現(xiàn)同步。這種方式的優(yōu)點(diǎn)是實(shí)時性高,可以精確捕捉上游的各種變動;缺點(diǎn)是部署數(shù)據(jù)庫的事件接收和解析器(例如 Debezium、Canal 等),有一定的學(xué)習(xí)和運(yùn)維成本,對一些冷門的數(shù)據(jù)庫支持不夠。
綜合來看,事件接收模式整體在實(shí)時性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫實(shí)現(xiàn),建議使用 Debezium ( http://debezium.io/documentation/reference/1.4/connectors/index.html ) 來實(shí)現(xiàn)變更數(shù)據(jù)的捕獲(下圖來自 Debezium 官方文檔 [http://debezium.io/documentation/reference/architecture.html] )。如果使用的只有 MySQL,則還可以用 Canal ( http://github.com/alibaba/canal) 。

三、為什么選 Flink?
從上圖可以看到,Debezium 官方架構(gòu)圖中,是通過 Kafka Streams 直接實(shí)現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因?yàn)?Flink 相對 Kafka Streams 而言,有如下優(yōu)勢:
- Flink 的算子和 SQL 模塊更為成熟和易用
- Flink 作業(yè)可以通過調(diào)整算子并行度的方式,輕松擴(kuò)展處理能力
- Flink 支持高級的狀態(tài)后端(State Backends),允許存取海量的狀態(tài)數(shù)據(jù)
- Flink 提供更多的 Source 和 Sink 等生態(tài)支持
- Flink 有更大的用戶基數(shù)和活躍的支持社群,問題更容易解決
- Flink 的開源協(xié)議允許云廠商進(jìn)行全托管的深度定制,而 Kafka Streams 只能自行部署和運(yùn)維
而且 Flink Table / SQL 模塊將數(shù)據(jù)庫表和變動記錄流(例如 CDC 的數(shù)據(jù)流)看做是 同一事物的兩面 ( http://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/dynamic_tables.html) ,因此內(nèi)部提供的 Upsert 消息結(jié)構(gòu)( +I 表示新增、 -U 表示記錄更新前的值、 +U 表示記錄更新后的值, -D 表示刪除)可以與 Debezium 等生成的變動記錄一一對應(yīng)。
四、Flink CDC 的使用方法
目前 Flink CDC 支持兩種數(shù)據(jù)源輸入方式。
(一)輸入 Debezium 等數(shù)據(jù)流進(jìn)行同步
例如 MySQL -> Debezium -> Kafka -> Flink -> PostgreSQL。適用于已經(jīng)部署好了 Debezium,希望暫存一部分?jǐn)?shù)據(jù)到 Kafka 中以供多次消費(fèi),只需要 Flink 解析并分發(fā)到下游的場景。

在該場景下,由于 CDC 變更記錄會暫存到 Kafka 一段時間,因此可以在這期間任意啟動/重啟 Flink 作業(yè)進(jìn)行消費(fèi);也可以部署多個 Flink 作業(yè)對這些數(shù)據(jù)同時處理并寫到不同的數(shù)據(jù)目的(Sink)庫表中,實(shí)現(xiàn)了 Source 變動與 Sink 的解耦。
用法示例
例如我們有個 MySQL 數(shù)據(jù)庫,需要實(shí)時將內(nèi)容同步到 PostgreSQL 中。假設(shè)已經(jīng)安裝部署好 Debezium 并開始消費(fèi) PostgreSQL 的變更日志,這些日志在持續(xù)寫入名為 YourDebeziumTopic 的 Kafka 主題中。
我們可以新建一個 Flink SQL 作業(yè),然后輸入如下 SQL 代碼(連接參數(shù)都是虛擬的,僅供參考):
CREATE TABLE `Data_Input` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;kafka&39;, -- 可選 &39;kafka&39;,&39;kafka-0.11&39;. 注意選擇對應(yīng)的內(nèi)置 Connector &39;topic&39; = &39;YourDebeziumTopic&39;, -- 替換為您要消費(fèi)的 Topic &39;scan.startup.mode&39; = &39;earliest-offset&39; -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一種 &39;properties.bootstrap.servers&39; = &39;10.0.1.2:9092&39;, -- 替換為您的 Kafka 連接地址 &39;properties.group.id&39; = &39;YourGroup&39;, -- 必選參數(shù), 一定要指定 Group ID -- 定義數(shù)據(jù)格式 (Debezium JSON 格式) &39;format&39; = &39;debezium-json&39;, &39;debezium-json.schema-include&39; = &39;false&39;,); CREATE TABLE `Data_Output` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;jdbc&39;, &39;url&39; = &39;jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchema=mySchema&reWriteBatchedInserts=true&39;, -- 請?zhí)鎿Q為您的實(shí)際 PostgreSQL 連接參數(shù) &39;table-name&39; = &39;MyTable&39;, -- 需要寫入的數(shù)據(jù)表 &39;username&39; = &39;user&39;, -- 數(shù)據(jù)庫訪問的用戶名(需要提供 INSERT 權(quán)限) &39;password&39; = &39;helloworld&39; -- 數(shù)據(jù)庫訪問的密碼); INSERT INTO `Data_Output` SELECt * FROM `Data_Input`;
如果在流計(jì)算 Oceanus 界面上,可以勾選 kafka 和 jdbc 兩個內(nèi)置的 Connector:

隨后直接開始運(yùn)行作業(yè),F(xiàn)link 就會源源不斷的消費(fèi) YourDebeziumTopic 這個 Kafka 主題中 Debezium 寫入的記錄,然后輸出到下游的 MySQL 數(shù)據(jù)庫中,實(shí)現(xiàn)了數(shù)據(jù)同步。
(二)直接對接上游數(shù)據(jù)庫進(jìn)行同步
我們還可以跳過 Debezium 和 Kafka 的中轉(zhuǎn),使用 Flink CDC Connectors ( http://github.com/ververica/flink-cdc-connectors ) 對上游數(shù)據(jù)源的變動進(jìn)行直接的訂閱處理。從內(nèi)部實(shí)現(xiàn)上講,F(xiàn)link CDC Connectors 內(nèi)置了一套 Debezium 和 Kafka 組件,但這個細(xì)節(jié)對用戶屏蔽,因此用戶看到的數(shù)據(jù)鏈路如下圖所示:

用法示例
同樣的,這次我們有個 MySQL 數(shù)據(jù)庫,需要實(shí)時將內(nèi)容同步到 PostgreSQL 中。但我們沒有也不想安裝 Debezium 等額外組件,那我們可以新建一個 Flink SQL 作業(yè),然后輸入如下 SQL 代碼(連接參數(shù)都是虛擬的,僅供參考):
CREATE TABLE `Data_Input` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;mysql-cdc&39;, -- 可選 &39;mysql-cdc&39; 和 &39;postgres-cdc&39; &39;hostname&39; = &39;192.168.10.22&39;, -- 數(shù)據(jù)庫的 IP &39;port&39; = &39;3306&39;, -- 數(shù)據(jù)庫的訪問端口 &39;username&39; = &39;debezium&39;, -- 數(shù)據(jù)庫訪問的用戶名(需要提供 SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT, SELECt, RELOAD 權(quán)限) &39;password&39; = &39;hello@world!&39;, -- 數(shù)據(jù)庫訪問的密碼 &39;database-name&39; = &39;YourData&39;, -- 需要同步的數(shù)據(jù)庫 &39;table-name&39; = &39;YourTable&39; -- 需要同步的數(shù)據(jù)表名);CREATE TABLE `Data_Output` ( id BIGINT, actor VARCHAR, alias VARCHAR, PRIMARY KEY (`id`) NOT ENFORCED) WITH ( &39;connector&39; = &39;jdbc&39;, &39;url&39; = &39;jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchema=mySchema&reWriteBatchedInserts=true&39;, -- 請?zhí)鎿Q為您的實(shí)際 PostgreSQL 連接參數(shù) &39;table-name&39; = &39;MyTable&39;, -- 需要寫入的數(shù)據(jù)表 &39;username&39; = &39;user&39;, -- 數(shù)據(jù)庫訪問的用戶名(需要提供 INSERT 權(quán)限) &39;password&39; = &39;helloworld&39; -- 數(shù)據(jù)庫訪問的密碼);INSERT INTO `Data_Output` SELECT * FROM `Data_Input`;
如果在流計(jì)算頁面,可以選擇內(nèi)置的 mysql-cdc 和 jdbc Connector:

注意
需要使用 Flink CDC Connectors ( http://github.com/ververica/flink-cdc-connectors) 附加組件。騰訊云 Oceanus 已經(jīng)自帶了 MySQL-CDC Connector,如果自行部署的話,需要下載 jar 包并將其放入 Flink 的 lib 目錄下。
訪問數(shù)據(jù)庫時,請確保連接的用戶足夠權(quán)限(PostgreSQL 用戶 看這里 [ http://debezium.io/documentation/reference/connectors/postgresql.htmlpostgresql-permissions] ,MySQL 用戶 看這里 [http://debezium.io/documentation/reference/connectors/mysql.htmlsetting-up-mysql] )。
五、Flink CDC 模塊的實(shí)現(xiàn)
(一)Debezium JSON 格式解析類探秘
flink-json 模塊中的 org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory 是負(fù)責(zé)構(gòu)造解析 Debezium JSON 格式的工廠類;同樣地, org.apache.flink.formats.json.canal.CanalJsonFormatFactory 負(fù)責(zé) Canal JSON 格式。這些類已經(jīng)內(nèi)置在 Flink 1.11 的發(fā)行版中,直接可以使用,無需附加任何程序包。
對于 Debezium JSON 格式而言,F(xiàn)link 將具體的解析邏輯放在了 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchemaDebeziumJsonDeserializationSchema 類中。

上圖表示 Debezium JSON 的一條更新(Update)消息,它表示上游已將 id=123 的數(shù)據(jù)更新,且字段內(nèi)包含了更新前的舊值,以及更新后的新值。
那么,F(xiàn)link 是如何解析并生成對應(yīng)的 Flink 消息呢?我們看下這個類的 deserialize 方法:
GenericRowData before = (GenericRowData) payload.getField(0); // 更新前的數(shù)據(jù)GenericRowData after = (GenericRowData) payload.getField(1); // 更新后的數(shù)據(jù)String op = payload.getField(2).toString(); // 獲取 &34;op&34; 字段的類型if (OP_CREATE.equals(op) || OP_READ.equals(op)) { // 如果是創(chuàng)建 (c) 或快照讀取 (r) 消息 after.setRowKind(RowKind.INSERT); // 設(shè)置消息類型為新建 (+I) out.collect(after); // 發(fā)送給下游} else if (OP_UPDATE.equals(op)) { // 如果是更新 (u) 消息 before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的數(shù)據(jù)類型設(shè)置為撤回 (-U) after.setRowKind(RowKind.UPDATE_AFTER); // 把更新后的數(shù)據(jù)類型設(shè)置為更新 (+U) out.collect(before); // 發(fā)送兩條數(shù)據(jù)給下游 out.collect(after);} else if (OP_DELETE.equals(op)) { // 如果是刪除 (d) 消息 before.setRowKind(RowKind.DELETE); // 將消息類型設(shè)置為刪除 (-D) out.collect(before); // 發(fā)送給下游} else { ... // 異常處理邏輯}
從上述邏輯可以看出,對于每一種 Debezium 的操作碼( op 字段的類型),都可以用 Flink 的 RowKind 類型來表示。對于插入 +I 和刪除 D ,都只需要一條消息即可;而對于更新,則涉及刪除舊數(shù)據(jù)和寫入新數(shù)據(jù),因此需要 -U 和 +U 兩條消息來對應(yīng)。
特別地,在 MySQL、PostgreSQL 等支持 Upsert(原子操作的 Update or Insert)語義的數(shù)據(jù)庫中,通常前一個 -U 消息可以省略,只把后一個 +U 消息用作實(shí)際的更新操作即可,這個優(yōu)化在 Flink 中也有實(shí)現(xiàn)。
因此可以看到,Debezium 到 Flink 消息的轉(zhuǎn)換邏輯是非常簡單和自然的,這也多虧了 Flink 先進(jìn)的設(shè)計(jì)理念,很早就提出并實(shí)現(xiàn)了 Upsert 數(shù)據(jù)流和動態(tài)數(shù)據(jù)表之間的映射關(guān)系。
1.Flink CDC Connectors 的實(shí)現(xiàn)
(1)flink-connector-debezium 模塊
我們在使用 Flink CDC Connectors 時,也會好奇它究竟是如何做到的不需要安裝和部署外部服務(wù)就可以實(shí)現(xiàn) CDC 的。當(dāng)我們閱讀 flink-connector-mysql-cdc 的源碼時,可以看到它內(nèi)部依賴了 flink-connector-debezium 模塊,而這個模塊將 Debezium Embedded ( http://github.com/debezium/debezium/tree/master/debezium-embedded ) 嵌入到了 Connector 中。
flink-connector-debezium 的數(shù)據(jù)源實(shí)現(xiàn)類為 com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction ,它集成了 Flink 中的 RichSourceFunction 并實(shí)現(xiàn)了 CheckpointedFunction 以支持快照保存狀態(tài)。
通常而言,對于 SourceFunction,我們可以從它的 run 方法入手分析。它的核心代碼如下:
this.engine = DebeziumEngine.create(Connect.class) .using(properties) // 初始化 Debezium 所需的參數(shù) .notifying(debeziumConsumer) // 收到批量的變更消息, 則 Debezium 會回調(diào) DebeziumChangeConsumer 來反序列化并向下游輸出數(shù)據(jù) .using(OffsetCommitPolicy.always()) .using( (success, message, error) -> { if (!success && error != null) { this.reportError(error); } }) .build();... executor.execute(engine); // 向 Executor 提交 Debezium 線程以啟動運(yùn)行
可以看到,這個 SourceFunction 使用一些預(yù)先定義的參數(shù),初始化了一個嵌入式的 DebeziumEngine(Java 的 Runnable ),然后提交給線程池(executor)去執(zhí)行。這個 Debezium 線程會批量接收 binlog 信息并回調(diào)傳入的 debeziumConsumer 以反序列化消息并交給 Flink 來處理。本類的其他方法主要負(fù)責(zé)初始化狀態(tài)和保存快照,這里略過。
這里我們再來看一下 DebeziumChangeConsumer 的實(shí)現(xiàn),它的最核心的方法是 handleBatch 。當(dāng) Debezium 收到一批新的事件時,會調(diào)用這個方法來通知我們的 Connector 進(jìn)行處理。這里有個 for 循環(huán)輪詢的邏輯:
for (ChangeEvent event : changeEvents) { // 輪詢各個事件 SourceRecord record = event.value(); if (isHeartbeatEvent(record)) { // 如果時心跳包 // 只更新當(dāng)前 offset 信息, 然后繼續(xù)(不進(jìn)行實(shí)際處理) synchronized (checkpointLock) { debeziumOffset.setSourcePartition(record.sourcePartition()); debeziumOffset.setSourceOffset(record.sourceOffset()); } continue; } deserialization.deserialize(record, debeziumCollector); // 反序列化這條消息 if (isInDbSnapshotPhase) { // 如果處于數(shù)據(jù)庫快照期, 需要阻止 Flink 檢查點(diǎn)(Checkpoint)生成 if (!lockHold) { MemoryUtils.UNSAFE.monitorEnter(checkpointLock); lockHold = true; ... } if (!isSnapshotRecord(record)) { // 如果已經(jīng)不在數(shù)據(jù)庫快照期了, 就釋放鎖, 允許 Flink 正常生成檢查點(diǎn)(Checkpoint) MemoryUtils.UNSAFE.monitorExit(checkpointLock); isInDbSnapshotPhase = false; ... } } // 更新當(dāng)前 offset 信息, 并向下游 Flink 算子發(fā)送數(shù)據(jù) emitRecordsUnderCheckpointLock( debeziumCollector.records, record.sourcePartition(), record.sourceOffset());}
可以看到邏輯比較簡單,只需要關(guān)注 checkpointLock 這個對象:只有持有這個對象的鎖時,才允許 Flink 進(jìn)行檢查點(diǎn)的生成。
當(dāng)作業(yè)處于數(shù)據(jù)庫快照期(即作業(yè)剛啟動時,需全量同步源數(shù)據(jù)庫的一份完整快照,此時收到的數(shù)據(jù)類型是 Debezium 的 SnapshotRecord ),則不允許 Flink 進(jìn)行 Checkpoint 即檢查點(diǎn)的生成,以避免作業(yè)崩潰恢復(fù)后狀態(tài)不一致;同樣地,如果正在向下游算子發(fā)送數(shù)據(jù)并更新 offset 信息時,也不允許快照的進(jìn)行。這些操作都是為了保證 Exacly-Once(精確一致)語義。
這里也解釋了在作業(yè)剛啟動時,如果數(shù)據(jù)庫較大(同步時間較久),F(xiàn)link 剛開始的 Checkpoint 永遠(yuǎn)失?。ǔ瑫r)的原因:只有當(dāng) Flink 完整同步了全量數(shù)據(jù)后,才可以進(jìn)行增量數(shù)據(jù)的處理,以及 Checkpoint 的生成。
(2)flink-connector-mysql-cdc 模塊
而對于 flink-connector-mysql-cdc 模塊而言,它主要涉及到 MySQLTableSource 的聲明和實(shí)現(xiàn)。
我們知道,F(xiàn)link 是通過 Java 的 SPI(Service Provider Interface)機(jī)制動態(tài)加載 Connector 的,因此我們首先看這個模塊的 src/main/resources/meta-INF/services/org.apache.flink.table.factories.Factory 文件,里面內(nèi)容指向 com.alibaba.ververica.cdc.connectors.mysql.table.MySQLTableSourceFactory 。
打開這個工廠類,我們可以看到它定義了該 Connector 所需的參數(shù),例如 MySQL 數(shù)據(jù)庫的用戶名、密碼、表名等信息,并負(fù)責(zé) MySQLTableSource 實(shí)例的具體創(chuàng)建,而 MySQLTableSource 類對這些參數(shù)做轉(zhuǎn)換,最終會生成一個上文提到的 DebeziumSourceFunction 對象。
因此我們可以發(fā)現(xiàn),這個模塊作用是一個 MySQL 參數(shù)的封裝和轉(zhuǎn)換層,最終的邏輯實(shí)現(xiàn)仍然是由 flink-connector-debezium 完成的。
六、MySQL CDC 常見問題&優(yōu)化
由于 Flink 的 CDC 功能還比較新(1.11 版本剛開始支持,1.12 版本逐步完善),因而在應(yīng)用過程中,很可能會遇到有各種問題。鑒于大多數(shù)客戶的數(shù)據(jù)源都是 MySQL,我們這里整理了客戶常見的一些問題和優(yōu)化方案,希望能夠幫助到大家。
Debezium 報(bào)錯:binlog probably contains events generated with statement or mixed based replication format
當(dāng)前的 Binlog 格式被設(shè)置為了 STATEMENT 或者 MIXED , 這兩種都不被 Debezium 支持。為了使用 Flink CDC 功能,需要把 MySQL 的 binlog-format 設(shè)置為 ROW :
SET GLOBAL binlog_format = &39;ROW&39;;SET GLOBAL binlog_row_image = &39;FULL&39;;
如果您使用的是騰訊云的 TencentDB for MySQL,請確認(rèn)下面設(shè)置:

Debezium 報(bào)錯:User does not have the &39;LOCK TABLES&39; privilege required to obtain a consistent snapshot 或 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s)
請對作業(yè)中指定的 MySQL 用戶賦予如下權(quán)限: SELECT, RELOAD, SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT ,例如:
GRANT SELECT , RELOAD, SHOW DATAbaseS , REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO &39;用戶名&39; IDENTIFIED BY &39;密碼&39; ;
FLUSH PRIVILEGES ;
如果您使用的數(shù)據(jù)庫不允許或者不希望使用 RELOAD 進(jìn)行全局鎖,則還需要授予 LOCK TABLES 權(quán)限以令 Debezium 嘗試進(jìn)行表級鎖。 注意,表級鎖會導(dǎo)致更長的數(shù)據(jù)庫鎖定時間!
如果希望徹底跳過鎖(對數(shù)據(jù)的一致性要求不高,但要求數(shù)據(jù)庫不能被鎖),則可以在 WITH 參數(shù)中設(shè)置 &39;debezium.snapshot.locking.mode&39; = &39;none&39; 參數(shù)來跳過鎖操作。但請注意,同步過程中千萬不要隨意變更庫表的結(jié)構(gòu)。
作業(yè)剛啟動期間,F(xiàn)link Checkpoint 一直失敗/重啟
前文講過,F(xiàn)link CDC Connector 在初始的全量快照同步階段,會屏蔽掉快照的執(zhí)行,因此如果 Flink Checkpoint 需要執(zhí)行的話,就會因?yàn)橐恢睙o法獲得 checkpointLock 對象的鎖而超時。
可以設(shè)置 Flink 的 execution.checkpointing.tolerable-failed-checkpoint 參數(shù)以容忍更多的 Checkpoint 失敗事件,同時可以調(diào)大 Checkpoint 周期,避免作業(yè)因 Checkpoint 失敗而一直重啟。
JDBC Sink 批量寫入時,數(shù)據(jù)會缺失幾條
如果發(fā)現(xiàn)數(shù)據(jù)庫中的某些數(shù)據(jù)在 CDC 同步后有缺失,請確認(rèn)是否仍在使用 Flink 舊版 1.10 的 Flink SQL WITH 語法(例如 WITH 參數(shù)中的 connector.type 是 舊語法 [ http://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.htmljdbc-connector] , connector 是 新語法 [ http://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.htmlhow-to-create-a-jdbc-table] )。
舊版語法的 Connector 在 JDBC 批量寫入 Upsert 數(shù)據(jù)(例如數(shù)據(jù)庫的更新記錄)時,并未考慮到 Upsert 與 Delete 消息之間的順序關(guān)系,因此會出現(xiàn)錯亂的問題,請盡快遷移到新版的 Flink SQL 語法。
異常數(shù)據(jù)造成作業(yè)持續(xù)重啟
默認(rèn)情況下,如果遇到異常的數(shù)據(jù)(例如消費(fèi)的 Kafka topic 在無意間混入了其他數(shù)據(jù)),F(xiàn)link 會立刻崩潰重啟,然后從上個快照點(diǎn)(Checkpoint)重新消費(fèi)。由于某條異常數(shù)據(jù)的存在,作業(yè)會永遠(yuǎn)因?yàn)楫惓6貑???梢栽?WITH 參數(shù)中加入 &39;debezium-json.ignore-parse-errors&39; = &39;true&39; 來應(yīng)對這個問題。
上游 Debezium 崩潰導(dǎo)致寫入重復(fù)數(shù)據(jù),結(jié)果不準(zhǔn)
Debezium 服務(wù)端發(fā)生異常并恢復(fù)后,由于可能沒有及時記錄崩潰前的現(xiàn)場,可能會退化為 At least once 模式,即同樣的數(shù)據(jù)可能被發(fā)送多次,造成下游結(jié)果不準(zhǔn)確。
為了應(yīng)對這個問題,新版的 Flink 1.12 增加了一個 table.exec.source.cdc-events-duplicate 配置項(xiàng)(可以編輯 flink-conf.yaml 文件來配置),建議將其設(shè)置為 true 以對這些重復(fù)數(shù)據(jù)進(jìn)行去重。
但是需要注意,該選項(xiàng)需要數(shù)據(jù)源表 定義了主鍵 ,否則也無法進(jìn)行去重操作。
七、未來展望
在 Flink 1.11 版本中,CDC 功能首次被集成到內(nèi)核中。由于 Flink 1.11.0 版本有個 嚴(yán)重 Bug ( http://issues.apache.org/jira/browse/Flink-18461 ) 造成 Upsert 數(shù)據(jù)無法寫入下游,我們建議使用 1.11.1 及以上版本。
在 1.12 版本上,F(xiàn)link 還在配置項(xiàng)中增加了前文提到的 table.exec.source.cdc-events-duplicate 等選項(xiàng)以更好地支持 CDC 去重;還支持 Avro 格式的 Debezium 數(shù)據(jù)流,而不僅僅限于 JSON 了。另外,這個版本增加了對 Maxwell ( http://maxwells-daemon.io/) 格式的 CDC 數(shù)據(jù)流支持,
為了更好地完善 CDC 功能模塊,F(xiàn)link 社區(qū)創(chuàng)建了 [Flink-18822] 以追蹤關(guān)于該模塊的進(jìn)展??梢詮闹锌吹?,F(xiàn)link 1.13 主要著力于支持更多的類型( Flink-18758 [ http://issues.apache.org/jira/browse/Flink-18758 ] ),以及允許從 Debezium Avro、Canal 等數(shù)據(jù)流中讀取一些元數(shù)據(jù)信息等。
而在更遠(yuǎn)的規(guī)劃中,F(xiàn)link 還可能支持基于 CDC 的內(nèi)存數(shù)據(jù)庫緩存,這樣我們可以在內(nèi)存中動態(tài)地 JOIN 一個數(shù)據(jù)庫的副本,而不必每次都查詢源庫,這將極大地提升作業(yè)的處理能力,并降低數(shù)據(jù)庫的查詢壓力。
作者:kyledong
來源:微信公眾號:騰訊云大數(shù)據(jù)
出處:http://mp.weixin.qq.com/s?__biz=MzUzNTc0NTcyMw==&mid=2247485657&idx=1&sn=f6fa360a7c35cb6f03e78222ca8691cb