一種實時數據庫
數據同步方案及實現
神州信息
董志
1.
概述
變化數據捕獲簡稱CDC(Change Data Capture),可以識別提取從上次提取之后發生變化的數據,在廣義的概念上,只要能捕獲數據變更的技術,我們都可以稱為CDC。通常我們說的 CDC技術主要面向數據庫的變更,是一種用于捕獲數據庫中數據變更的技術。CDC的兩種模式:
(1)同步:同步CDC主要是采用觸發器記錄新增數據,基本能夠做到實時增量提取。
(2)異步:異步CDC通過分析已經提交的日志記錄來得到增量數據信息,有一定的延時,是本文采用的模式。
1.1. 應用場景
(1)數據同步,用于備份、容災。
(2)數據分發,一個數據源分發給多個下游。
(3)數據采集,面向數據倉庫/數據湖的ETL數據集成。
1.2. 主流的實現機制
(1)基于查詢的CDC
a)離線調度查詢作業,批處理。
b)無法保障數據一致性。
c)不保障實時性。
(2)基于日志的CDC
a)實時消費日志,流處理。
b)保障數據一致性。
c)提供實時數據。
2.
方案對比
主流開源CDC方案對比如下圖所示,主要通過監控各數據庫的事務日志達到監控數據變化的目的,根據對比采用Flink CDC 方案。
圖:多種CDC技術對比
(1) DataX 不支持增量同步,Canal 不支持全量同步。雖然兩者都是非常流行的數據同步工具,但在場景支持上仍不完善。
(2) 在全量+增量一體化同步方面,只有Flink CDC、Debezium、Oracle Goldengate 支持較好。
(3) 在架構方面,Apache Flink 是一個非常優秀的分布式流處理框架,因此Flink CDC 作為Apache Flink 的一個組件具有非常靈活的水平擴展能力。而DataX 和Canal 是個單機架構,在大數據場景下容易面臨性能瓶頸的問題。
(4) 在數據加工的能力上,CDC 工具是否能夠方便地對數據做一些清洗、過濾、聚合,甚至關聯操作?Flink CDC 依托強大的Flink SQL 流式計算能力,可以非常方便地對數據進行加工。而 Debezium 等則需要通過復雜的 Java 代碼才能完成,使用門檻比較高。
(5) 另外,在生態方面,這里指的是上下游存儲的支持。Flink CDC 上下游非常豐富,支持對接MySQL、PostgreSQL 等數據源,還支持寫入到TiDB、HBase、Kafka、Hudi 等各種存儲系統中,也支持靈活的自定義connector。
因此,不論從性能還是適用范圍上,Flink CDC 都可以作為最佳選擇。Flink CDC Connectors是Apache Flink的一組source連接器,使用變更數據捕獲 (CDC) 從不同的數據庫中獲取變更,Flink CDC連接器集成了Debezium作為引擎來捕獲數據變化,所以它可以充分發揮Debezium的能力。目前連接器支持的數據庫有:MySQL(5.6+)、PostgreSQL(9.6+)、MongoDB(3.6+)、Oracle(11+)、TiDB(5.1.x+)、SQL Server(2012+)和Oceanbase(3.1.x+)。
3.
數據庫事務日志
目前支持的關系型數據庫包括:MySQL、Oracle、PostgreSQL、SQL Server,主要采用基于WAL日志方式進行數據變化監聽。下面介紹各關系型數據庫的日志類型:
1. MySQL
(1)Error log錯誤日志記錄了MySQL Server運行過程中所有較為嚴重的警告和錯誤信息,以及MySQL Server每次啟動和關閉的詳細信息。
(2)Binary log二進制日志,記錄著數據庫發生的各種事務信息。
(3)Update log更新日志是MySQL在較老版本上使用的,其功能跟Bin log類似,只不過不是以二進制格式記錄,而是以簡單文本格式記錄內容。
(4)Query log查詢日志記錄MySQL中所有的query。
(5)Slow query log慢查詢日志記錄的就是執行時間較長的query。
(6)InnoDB redo log,InnoDB是一個事務安全的存儲引擎,其事務安全性主要就是通過在線redo日志和記錄在表空間中的undo信息來保證的。
2. Oracle
(1)系統報警日志alert.log。
(2)跟蹤日志(用戶和進程) trace.log。
(3)重做日志。
a. 在線重做日志:又稱聯機重做日志,指Oracle以SQL腳本的形式實時記錄數據庫的數據更新,換句話說,實時保存已執行的SQL腳本到在線日志文件中(按特定的格式)。
b. 歸檔重做日志:指當條件滿足時,Oracle將在線重做日志以文件形式保存到硬盤(持久化)。
3. PostgreSQL
(1)pg_log文件夾中的日志一般用來記錄服務器與DB的狀態,如各種Error信息,定位慢查詢SQL,數據庫的啟動關閉信息,發生checkpoint過于頻繁等的告警信息等。
(2)pg_xlog文件夾中的日志是記錄的PostgreSQL的WAL信息,也就是一些事務日志信息(transaction log),記錄著數據庫發生的各種事務信息。
(3)pg_clog文件夾存儲的也是事務日志文件,但與pg_xlog不同的是它記錄的是事務的元數據(metadata),這個日志告訴我們哪些事務完成了,哪些沒有完成。
4. SQL Server
(1)交易日志(Transaction logs),是針對數據庫改變所做的記錄,它可以記錄針對數據庫的任何操作,并將記錄結果保存在獨立的文件中。對于任何每一個交易過程,交易日志都有非常全面的記錄,根據這些記錄可以將數據文件恢復成交易前的狀態。
4.
功能實現
1. 整體架構
整體架構如下圖所示,首先各源端數據庫需要開啟相應的事務日志,Flink CDC 任務會監聽各數據庫的事務變化日志,然后對日志數據進行處理,最后將數據進行傳輸:
(1)通過訂閱發布方式將消息發送到Redis 的Channel 中,通知消費者數據庫中的數據發生了變化。
(2)以流的方式存儲到Kafka 的 Topic中,供下游程序進行消費。
(3)抽取到其他關系型數據庫中,實現 ETL 功能。
圖:整體架構圖
2. 數據格式
由于 Flink CDC 內部集成了 Debezium 組件,通過 Debezium 進行數據采集,所以數據格式同 Debezium,監聽到的數據格式如下圖所示,after 代表變化后的數據;source 代表源端的數據庫相關信息,包括 Debezium 版本號、連接器類型、數據庫名、表名等;op 代表操作的類型,此處為讀操作。
圖:數據格式
3. 事務日志開啟
(1) MySQL 開啟Bin Log 日志
在 my.cnf 里面加上如下配置,重啟服務。
查看是否開啟Bin Log日志 show variables like 'log_%';
(2) Oracle 開啟歸檔日志
啟用歸檔日志:
檢查歸檔日志是否啟用:
啟動補充日志記錄:
4. 具體代碼實現
DataStream方式監聽 MySQL 數據庫實現:
DataStream方式監聽 Oracle 數據庫實現:
Flink SQL方式監聽 MySQL 數據庫實現:
自定義反序列化器:
自定義Redis Sink: