[TOC]
数据的抽取(Extract),数据的转换(Transform),加载(Load)
ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。
查看mysql服务配置文件的位置
which mysqld
/usr/bin/mysqld --verbose --help | grep -A 1 'Default options'
查看binlog位置
cat /etc/my.cnf
# log_bin = bin.log
获取binlog文件列表
mysql> show binary logs;
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
重启mysql
/etc/init.d/mysqld restart
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
# adapter ClientAdapter
# admin 管理界面
# deployer canal-server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.example-1.1.4.tar.gz
mkdir canal-server
tar zxvf canal.deployer-1.1.4.tar.gz -C ~/canal-server
cd canal-server
配置修改
vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
特别注意
检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是...,那么相当于你消费了所有的更新数据
启动
sh bin/startup.sh
sh bin/startup.sh local 指定配置
# 查看 server 日志
cat logs/canal/canal.log
# 查看 instance 的日志
cat logs/example/example.log
# 关闭
sh bin/stop.sh
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
默认支持将canal server接收到的binlog数据直接投递到MQ(先安装zk), 目前默认支持的MQ系统有:
binlog本身是有序的
http://localhost:11112/metrics
指标 | 说明 | 单位 | 精度 |
---|---|---|---|
canal_instance_transactions | instance接收transactions计数 | - | - |
canal_instance | instance基本信息 | - | - |
canal_instance_subscriptions | instance订阅数量 | - | - |
canal_instance_publish_blocking_time | instance dump线程提交到异步解析队列过程中的阻塞时间(仅parallel解析模式) | ms | ns |
canal_instance_received_binlog_bytes | instance接收binlog字节数 | byte | - |
canal_instance_parser_mode | instance解析模式(是否开启parallel解析) | - | - |
canal_instance_client_packets | instance client请求次数的计数 | - | - |
canal_instance_client_bytes | 向instance client发送数据包字节计数 | byte | - |
canal_instance_client_empty_batches | 向instance client发送get接口的空结果计数 | - | - |
canal_instance_client_request_error | instance client请求失败计数 | - | - |
canal_instance_client_request_latency | instance client请求的响应时间概况 | - | - |
canal_instance_sink_blocking_time | instance sink线程put数据至store的阻塞时间 | ms | ns |
canal_instance_store_produce_seq | instance store接收到的events sequence number | - | - |
canal_instance_store_consume_seq | instance store成功消费的events sequence number | - | - |
canal_instance_store | instance store基本信息 | - | - |
canal_instance_store_produce_mem | instance store接收到的所有events占用内存总量 | byte | - |
canal_instance_store_consume_mem | instance store成功消费的所有events占用内存总量 | byte | - |
canal_instance_put_rows | store put操作完成的table rows | - | - |
canal_instance_get_rows | client get请求返回的table rows | - | - |
canal_instance_ack_rows | client ack操作释放的table rows | - | - |
canal_instance_traffic_delay | server与MySQL master的延时 | ms | ms |
canal_instance_put_delay | store put操作events的延时 | ms | ms |
canal_instance_get_delay | client get请求返回events的延时 | ms | ms |
canal_instance_ack_delay | client ack操作释放events的延时 | ms | ms |
指标 | 简述 | 多指标 |
---|---|---|
Basic | Canal instance 基本信息。 | 是 |
Network bandwith | 网络带宽。包含inbound(canal server读取binlog的网络带宽)和outbound(canal server返回给canal client的网络带宽) | 是 |
Delay | Canal server与master延时;store 的put, get, ack操作对应的延时。 | 是 |
Blocking | sink线程blocking占比;dump线程blocking占比(仅parallel mode)。 | 是 |
TPS(transaction) | Canal instance 处理binlog的TPS,以MySQL transaction为单位计算。 | 否 |
TPS(tableRows) | 分别对应store的put, get, ack操作针对数据表变更行的TPS | 是 |
Client requests | Canal client请求server的请求数统计,结果按请求类型分类(比如get/ack/sub/rollback等)。 | 否 |
Response time | Canal client请求server的响应时间统计。 | 否 |
Empty packets | Canal client请求server返回空结果的统计。 | 是 |
Store remain events | Canal instance ringbuffer中堆积的events数量。 | 否 |
Store remain mem | Canal instance ringbuffer中堆积的events内存使用量。 | 否 |
Client QPS | client发送请求的QPS,按GET与CLIENTACK分类统计 | 是 |
The Java client includes collectors for garbage collection, memory pools, JMX, classloading, and thread counts. These can be added individually or just use the DefaultExports to conveniently register them.
DefaultExports.initialize();
canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:
# adapter ClientAdapter
# admin 管理界面
# deployer canal-server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
mkdir canal-consumer
tar zxvf canal.adapter-1.1.5.tar.gz -C ~/canal-consumer
cd canal-consumer
adapter定义配置部分
canal.conf:
canalServerHost: 127.0.0.1:11111 # 对应单机模式下的canal server的ip:port
zookeeperHosts: slave1:2181 # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
mqServers: slave1:6667 #or rocketmq # kafka或rocketMQ地址, 与canalServerHost不能并存
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
batchSize: 50 # 每次获取数据的批大小, 单位为K
syncBatchSize: 1000 # 每次同步的批数量
retries: 0 # 重试次数, -1为无限重试
timeout: # 同步超时时间, 单位毫秒
mode: tcp # kafka rocketMQ # canal client的模式: tcp kafka rocketMQ
srcDataSources: # 源数据库
defaultDS: # 自定义名称
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # jdbc url
username: root # jdbc 账号
password: 121212 # jdbc 密码
canalAdapters: # 适配器列表
- instance: example # canal 实例名或者 MQ topic 名
groups: # 分组列表
- groupId: g1 # 分组id, 如果是MQ模式将用到该值
outerAdapters: # 分组内适配器列表
- name: logger # 日志打印适配器
......
说明:
可以将本地application.yml文件和其他子配置文件删除或清空, 启动工程将自动从远程加载配置
修改mysql中的配置信息后会自动刷新到本地动态加载相应的实例或者应用
前提:
启动Canal Admin
canal:
manager:
jdbc:
url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
username: root
password: 121212
-- ----------------------------
-- Table structure for label
-- ----------------------------
DROP TABLE IF EXISTS `label`;
CREATE TABLE `label` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) NOT NULL,
`label` varchar(30) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for role
-- ----------------------------
DROP TABLE IF EXISTS `role`;
CREATE TABLE `role` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`role_name` varchar(30) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(30) NOT NULL,
`c_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`role_id` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;
insert into user (id,name,role_id) values (1,'Eric',1);
insert into role (id,role_name) values (1,'admin');
insert into role (id,role_name) values (2,'operator');
insert into label (id,user_id,label) values (1,1,'a');
insert into label (id,user_id,label) values (2,1,'b');
commit;
https://github.com/alibaba/canal/wiki/ClientAdapter#31-%E5%90%AF%E5%8A%A8canal-adapter%E7%A4%BA%E4%BE%8B
同步es7(V1.1.5)
配置
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: admin
password: lcb@2019.com
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
hosts: 192.168.110.174:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: es-cluster
新增conf/es7/mytest_user.yml文件
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey # 对应application.yml中es配置的key
destination: example # cannal的instance或者MQ的topic
groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的type名称, es7下无需配置此项
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
# sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _name: array:, # 数组或者对象属性, array:, 代表字段里面是以,分隔成的数组,_name是以,分割的字符串
# _role_name: object # _role_name 是 json字符串
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
用的这个sql
"select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time from user a
left join role b on b.id=a.role_id"
启动
cd canal-consumer
sh bin/startup.sh
# admin 管理界面
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
mkdir canal-admin
tar zxvf canal.admin-1.1.4.tar.gz -C ~/canal-admin
cd canal-admin
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
mysql
# 导入初始化SQL
> source conf/canal_manager.sql
sh bin/startup.sh
vi logs/admin.log
http://127.0.0.1:8089/ 访问,默认密码:admin/123456
sh bin/stop.sh
curl 127.0.0.1:8081/etl/es7/mytest_user.yml -X POST