FlinkCDC-MySQL数据库实时同步——by郑国际
一、目的
在Linux环境下使用Flink CDC(Change Data Capture)将一个MySQL数据库实时同步到另一个MySQL数据库,涉及以下几个关键步骤:
准备运行环境,即安装Flink和MySQL数据库,同时修改配置信息使其可以从外网访问运行Flink和MySQL。此外,MySQL数据库还需要打开binlog日志访问权限,然后在Flink中添加FlinkCDC对MySQL数据库连接的相关依赖,以及连接MySQL数据库的JDBC依赖。
创建同步表,即在MySQL源数据库(Source)和目标数据库(Sink)中创建对应的数据表进行数据同步,本次同步形式为多表对单表。
编写并提交Flink作业,即在保存Flink作业文件后通过web页面或终端界面提交,根据提交形式的不同把作业分为jar、sql等文件类型,提交后Flink将启用对应的运行环境执行作业。本次提交的作业类型为sql形式,在Linux终端提交。
以上步骤完成后,测试向源数据库更新数据表信息并检查同步情况,Flink会根据作业中设置的checkpointing时间间隔定时执行sql语句,根据MySQL数据库的binlog日志变更记录实现数据库同步。
二、环境准备
1、Flink配置
官网下载并安装Apache Flink。本次Flink版本为1.18。
解压Flink到指定目录,并配置FLINK_HOME。
配置Flink的web页面访问权限,使外网可以直接访问。在flink-conf.yaml中对应添加以下两段配置:
vim ./flink-1.18.0/conf/flink-conf.yaml
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
在Flink目录下的lib文件夹中添加适配Flink当前版本的jdbc依赖和对应的mysql-cdc依赖,如下图所示。
2、MySQL配置
安装MySQL5.7以上版本的数据库,在配置端口和root密码后能够正常登录。以下分别是Linux和Windows系统的MySQL数据库。


3、开启binlog日志
(1)Linux系统下打开MySQL配置文件:
vim /etc/my.cnf
追加以下代码:
log-bin=mysql-bin #binlog
binlog_format=ROW #选择row
server_id=1 #mysql实例id
重启mysql服务(二选一):
sudo service mysqld restart
或sudo service mysql restart
(2)Windows系统下打开MySQL配置文件:
在[mysqld]内添加以下代码:
[mysqld]
log-bin=mysql-bin
server-id=1
binlog_format=ROW
关闭MySQL服务后在计算机管理中重启服务:

重启服务后登录查看binlog是否开启,若开启则Value值为ON:
三、创建同步表
1、创建源表(多表)
CREATE TABLE products (
id int(10) unsigned NOT NULL AUTO_INCREMENT,
name varchar(20) DEFAULT '',
description varchar(50) DEFAULT '',
create_time datetime DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4
CREATE TABLE storage_info (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
product_id bigint(20) unsigned NOT NULL DEFAULT '0',
num int(20) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4
2、创建目标表(单表)
CREATE TABLE product_storage (
product_id bigint(20) unsigned NOT NULL,
product_name varchar(20) NOT NULL DEFAULT '',
remain_count int(10) unsigned NOT NULL DEFAULT '0',
create_time datetime NOT NULL,
UNIQUE KEY uniq_product_id (product_id),
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4
四、配置Flink作业
1、编写Flink作业
新建test.sql,添加如下语句:
SET execution.checkpointing.interval = 10s;
drop table if exists products;
CREATE TABLE products (
id INT NOT NULL,
name STRING,
description STRING,
create_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.2.182',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'Flinktest',
'server-time-zone' = 'Asia/Shanghai',
'table-name' = 'products'
);
drop table if exists storage_info;
CREATE TABLE storage_info (
id INT NOT NULL,
product_id INT,
num INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.2.182',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'Flinktest',
'server-time-zone' = 'Asia/Shanghai',
'table-name' = 'storage_info'
);
drop table if exists product_storage;
CREATE TABLE product_storage (
product_id INT,
product_name STRING,
remain_count INT,
create_time TIMESTAMP,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.102:3306/flinktest?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
'username' = 'root',
'password' = 'root',
'table-name' = 'product_storage',
'driver' = 'com.mysql.cj.jdbc.Driver',
'scan.fetch-size' = '200'
);
INSERT INTO product_storage
SELECT
a.id AS product_id,
a.name AS product_name,
b.num AS remain_count,
a.create_time
FROM products as a
join storage_info as b on a.id=b.product_id;
2、提交Flink作业
Flink启动和停止:
./flink-1.18.0/bin/start-cluster.sh ###Flink启动
./flink-1.18.0/bin/stop-cluster.sh ###Flink停止
启动后可通过web页面查看Flink当前状态(默认端口8081):

在Linux中提交Flink作业(sql):

执行成功后可通过web页面查看Flink运行情况:

五、测试结果
在两张源表中插入数据:
insert into products (id,name,description,create_time)
values (1,'Product001','This is the first product.','2024-09-26 15:39:01'),
(2,'Product002','This is the second product.','2024-09-26 15:39:02'),
(3,'Product003','This is the third product.','2024-09-26 15:39:03');
insert into storage_info (id,product_id,num)
values (1,1,20),(2,2,30),(3,3,40);
插入数据后查看源表:

等待10s后查看目标表:
至此MySQL数据库同步成功。