开源推荐 MySQL Binlog 增量同步工具 go-mysql-transfer 实现详解

wj596 · 2020年09月05日 · 最后由 niuniu 回复于 2020年09月15日 · 821 次阅读
本帖已被设为精华帖!

一、 概述

工作需要研究了下阿里开源的 MySQL Binlog 增量订阅消费组件 canal,其功能强大、运行稳定,但是有些方面不是太符合需求,主要有如下三点:

1、需要自己编写客户端来消费 canal 解析到的数据

2、server-client 模式,需要同时部署 server 和 client 两个组件,我们的项目中有 6 个业务数据库要实时同步到 redis,意味着要多部署 12 个组件,硬件和运维成本都会增加。

3、从 server 端到 client 端需要经过一次网络传输和序列化反序列化操作,然后再同步到接收端,感觉没有直接怼到接收端更高效。

go-mysql-transfer 是使用 Go 语言实现的 MySQL 数据库实时增量同步工具, 参考 Canal 但是规避了上述三点。旨在实现一个高性能、低延迟、简洁易用的 Binlog 增量数据同步管道, 具有如下特点:

1、不依赖其它组件,一键部署

2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ,不需要再编写客户端,开箱即用

3、内置丰富的数据解析、消息生成规则;支持 Lua 脚本,以处理更复杂的数据逻辑

4、支持监控告警,集成 Prometheus 客户端

5、高可用集群部署

6、数据同步失败重试

7、全量数据初始化

二、 与同类工具比较

特色 Canal mysql_stream go-mysql-transfer
开发语言 Java Python Golang
HA 支持 支持 支持
接收端 编码定制 Kafka 等 Redis、MongoDB、Elasticsearch、
RabbitMQ、Kafka、RocketMQ
后续支持更多
数据初始化 不支持 支持 支持
数据格式 编码定制 json(固定) 规则(固定)
Lua 脚本 (定制)

三、 设计实现

1、实现原理

go-mysql-transfer 将自己伪装成 MySQL 的 Slave,向 Master 发送 dump 协议获取 binlog,解析 binlog 并生成消息,实时发送给接收端。

go-mysql-transfer原理

2、数据转换规则

将从 binlog 解析出来的数据,经过简单的处理转换发送到接收端。使用内置丰富数数据转换规则,可完成大部分同步工作。

例如将表 t_user 同步到 reids,配置如下规则:

rule:
  -
    schema: eseap #数据库名称
    table: t_user #表名称
    column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
    datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值编码类型,支持json、kv-commas、v-commas
    redis_structure: string # redis数据类型。 支持string、hash、list、set类型(与redis的数据类型一致)
    redis_key_prefix: USER_ #key前缀
    redis_key_column: USER_NAME #使用哪个列的值作为key,不填写默认使用主键

t_user 表,数据如下:

同步到 Redis 后,数据如下:

更多规则配置和同步案例 请见后续的"使用说明"章节。

3、数据转换脚本

Lua 是一种轻量小巧的脚本语言, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。开发者只需要花费少量时间就能大致掌握 Lua 的语法,照虎画猫写出可用的脚本。

基于 Lua 的高扩展性,可以实现更为复杂的数据解析、消息生成逻辑,定制需要的数据格式。

使用方式:

rule:
  -
    schema: eseap
    table: t_user
    lua_file_path: lua/t_user_string.lua   #lua脚本文件

示例脚本:

local json = require("json")    -- 加载json模块
local ops = require("redisOps") -- 加载redis操作模块

local row = ops.rawRow()  --当前变动的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库的操作事件,包括:insert、updare、delete

local id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local key = "user_"..id -- 定义key

if action == "delete" -- 删除事件
then
    ops.DEL(key)  -- 删除KEY
else 
    local password = row["PASSWORD"] --获取USER_NAME列的值
    local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    local result= {}  -- 定义结果
    result["id"] = id
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 数据来源
    local val = json.encode(result) -- 将result转为json
    ops.SET(key,val)  -- 对应Redis的SET命令,第一个参数为key(string类型),第二个参数为value
end 

t_user 表,数据如下:

同步到 Redis 后,数据如下:

更多 Lua 脚本使用说明 和同步案例 请见后续的"使用说明"章节。

4、监控告警

Prometheus 是流行开源监控报警系统和 TSDB,其指标采集组件被称作 exporter。go-mysql-transfer 本身就是一个 exporter。向 Prometheus 提供应用状态、接收端状态、insert 数量、update 数量、delete 数量、delay 延时等指标。

go-mysql-transfer 内置 Prometheus exporter 可以监控系统的运行状况,并进行健康告警。

相关配置:

enable_exporter: true #启用prometheus exporter,默认false
exporter_addr: 9595 #prometheus exporter端口,默认9595

直接访问 127.0.0.1:9595 可以看到导出的指标值,如何与 Prometheus 集成,请参见 Prometheus 相关教程。

指标说明:

transfer_leader_state:当前节点是否为 leader,0=否、1=是

transfer_destination_state:接收端状态, 0=掉线、1=正常

transfer_inserted_num:插入数据的数量

transfer_updated_num:修改数据的数量

transfer_deleted_num:删除数据的数量

transfer_delay:与 MySQL Master 的时延

5、高可用

可以选择依赖 zookeeper 或者 etcdr 构建高可用集群,一个集群中只存在一个 leader 节点,其余皆为 follower 节点。

只有 leader 节点响应 binglog 的 dump 事件,follower 节点为蛰伏状态,不发送 dump 命令,因此多个 follower 也不会加重 Master 的负担。

当 leader 节点出现故障,follower 节点迅速替补上去,实现秒级故障切换。

相关配置:

cluster: # 集群配置
  name: myTransfer #集群名称,具有相同name的节点放入同一个集群
  # ZooKeeper地址,多个用逗号分隔
  zk_addrs: 192.168.1.10:2181,192.168.1.11:2182,192.168.1.12:2183
  #zk_authentication: 123456 #digest类型的访问秘钥,如:user:password,默认为空
  #etcd_addrs: 192.168.1.10:2379 #etcd连接地址,多个用逗号分隔
  #etcd_user: test #etcd用户名
  #etcd_password: 123456 #etcd密码

6、失败重试

网络抖动、接收方故障都会导致数据同步失败,需要有重试机制,才能保证不漏掉数据,使得每一条数据都能送达。

通常有两种重试实现方式,一种方式是记录下故障时刻 binglog 的 position(位移),等故障恢复后,从 position 处重新 dump 数据,发送给接收端。

一种方式是将同步失败的数据在本地落盘,形成队列。当探测到接收端可用时,逐条预出列尝试发送,发送成功最终出列。确保不丢数据,队列先进先出的特性也可保证数据顺序性,正确性。

go-mysql-transfer 采用的是后者,目的是减少发送 dump 命令的次数,减轻 Master 的负担。因为 binglog 记录的整个 Master 数据库的日志,其增长速度很快。如果只需要拿几条数据,而 dump 很多数据,有点得不偿失。

7、全量数据初始化

如果数据库原本存在无法通过 binlog 进行增量同步的数据,可以使用命令行工具-stock 完成始化同步。

stock 基于 SELECT * FROM {table}的方式分批查询出数据,根据规则或者 Lua 脚本生成指定格式的消息,批量发送到接收端。

执行命令 go-mysql-transfer -stoc,在控制台可以直观的看到数据同步状态,如下:

四、安装

二进制安装包

直接下载编译好的安装包: 点击下载

源码编译

1、依赖 Golang 1.14 及以上版本

2、设置' GO111MODULE=on '

3、拉取源码 ‘ go get -d github.com/wj596/go-mysql-transfer’

3、进入目录,执行 ‘ go build ’ 编译

五、部署运行

开启 MySQL 的 binlog

#Linux在my.cnf文件
#Windows在my.ini文件
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 go-mysql-transfer 的 slave_id 重复

命令行运行

1、修改 app.yml

2、Windows 直接运行 go-mysql-transfer.exe

3、Linux 执行 nohup go-mysql-transfer &

docker 运行

1、拉取源码 ‘ go get -d github.com/wj596/go-mysql-transfer’

2、修改配置文件 ‘ app.yml ’ 中相关配置

3、构建镜像 ‘ docker image build -t go-mysql-transfer -f Dockerfile . ’

4、运行 ‘ docker run -d --name go-mysql-transfer -p 9595:9595 go-mysql-transfer:latest ’

六、使用说明

1、同步到 Redis 操作说明

2、同步到 MongoDB 操作说明

3、同步到 Elasticsearch 操作说明

4、同步到 RocketMQ 操作说明

5、同步到 Kafka 操作说明

6、同步到 RabbitMQ 操作说明

七、开源

github:go-mysql-transfer

八、性能测试

1、测试环境

平台:虚拟机 CPU:E7-4890 4 核 8 线程 内存:8G 硬盘:机械硬盘 OS:Windows Sever 2012 R2 MySQL: 5.5 Rides: 4.0.2

2、测试数据 t_user 表,14 个字段,1 个字段包含中文,数据量 527206 条

3、测试配置

规则:

schema: eseap
table: t_user
order_by_column: id #排序字段,全量数据初始化时不能为空
#column_lower_case:false #列名称转为小写,默认为false
#column_upper_case:false#列名称转为大写,默认为false
column_underscore_to_camel: true #列名称下划线转驼峰,默认为false
# 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列
#include_column: ID,USER_NAME,PASSWORD
date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd
datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss
value_encoder: json  #值编码,支持json、kv-commas、v-commas
redis_structure: string # 数据类型。 支持string、hash、list、set类型(与redis的数据类型一直)
redis_key_prefix: USER_ #key的前缀
redis_key_column: ID #使用哪个列的值作为key,不填写默认使用主键

脚本:

local json = require("json")    -- 加载json模块
local ops = require("redisOps") -- 加载redis操作模块

local row = ops.rawRow()  --当前变动的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库的操作事件,包括:insert、updare、delete

local id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local key = "user_"..id -- 定义key

if action == "delete" -- 删除事件
then
    ops.DEL(key)  -- 删除KEY
else 
    local password = row["PASSWORD"] --获取USER_NAME列的值
    local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
    local result= {}  -- 定义结果
    result["id"] = id
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 数据来源
    local val = json.encode(result) -- 将result转为json
    ops.SET(key,val)  -- 对应Redis的SET命令,第一个参数为key(string类型),第二个参数为value
end

3、测试用例一

使用规则,将 52 万条数据全量初始化同步到 Redis,结果如下:

3 次运行的中间值为 4.6 秒

4、测试用例二

使用 Lua 脚本,将 52 万条数据全量初始化同步到 Redis,结果如下:

3 次运行的中间值为 9.5 秒

5、测试用例三

使用规则,将 binlog 中 52 万条增量数据同步到 Redis。结果如下:

每秒增量同步 (TPS) 32950 条

6、测试用例四

使用 Lua 脚本,将 binlog 中 52 万条增量数据同步到 Redis。结果如下:

每秒增量同步 (TPS) 15819 条

7、测试用例五

100 个线程不停向 MySQL 写数据,使用规则将数据实时增量同步到 Redis,TPS 保持在 4000 以上,资源占用情况如下:

100 个线程不停向 MySQL 写数据,使用 Lua 脚本将数据实时增量同步到 Redis,TPS 保持在 2000 以上,资源占用情况如下:

以上测试结果,会随着测试环境的不同而改变,仅作为参考。

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
kevin 将本帖设为了精华贴 09月06日 21:57

一起聊下 go 语言相关大数据?我的邮箱 askuygo@foxmail.com

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册