前言
官方网址https://github.com/alibaba/DataX
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
设计理念
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DataX3.0核心架构
DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。
DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
工具部署
环境
- Linux
- JDK(1.8以上,推荐1.8)
- Python(2或3都可以)
- DataX 编译后版本
直接下载编译后的DataX工具包:DataX下载地址
下载后解压至本地某个目录,进入bin目录,即可运行同步作业:
解压到目录
/data/tools/bigdata/datax3
Linux
创建配置文件
1 | vi /etc/profile.d/datax.sh |
内容设置为
1 | # DataX |
配置生效
1 | source /etc/profile |
查看是否生效
1 | echo $DATAX_HOME |
运行方式
1 | cd $DATAX_HOME/bin/ |
自检脚本:
1 | python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/job.json |
Windows
添加环境变量DATAX_HOME
D:\Tools\BigData\datax
测试环境环境是否生效(注意使用CMD,不要使用PowerShell)
1 | echo %DATAX_HOME% |
自检脚本:
1 | chcp 65001 |
默认会报错
要保证core.transport.channel.speed.byte
和job.setting.speed.byte
同时进行设置或全都不设置
1 | { |
如果日志乱码
数字65001代表utf-8格式的编码
数字936代表gbk格式的编码
1 | chcp 65001 |
定时任务
crontab 命令
-l
在标准输出上显示当前的crontab。-r
删除当前的crontab文件。-e
使用VISUAL或者EDITOR环境变量所指的编辑器编辑当前的crontab文件。当结束编辑离开时,编辑后的文件将自动安装。
格式如下:
1 | minute hour day-of-month month-of-year day-of-week commands |
合法值
1 | 00-59 00-23 01-31 01-12 0-6 (0 is sunday) |
除了数字还有几个个特殊的符号就是*
、/
和-
、,
,*
代表所有的取值范围内的数字
/
代表每的意思,/5
表示每5个单位-
代表从某个数字到某个数字,
分开几个离散的数字
查看定时任务
1 | crontab -l |
添加定时任务
1 | crontab -e |
添加如下
1 | 5 1 * * * python $DATAX_HOME/bin/datax.py $DATAX_HOME/config/oracle2oracle.json >>$DATAX_HOME/log/datax_log.`date +\%Y\%m\%d\%H\%M\%S` 2>&1 |
或者自己添加配置文件
1 | crontab /data/cron/mysqlRollBack.cron |
配置
配置项
- job 下面有两个配置项,content 和 setting,其中 content 用来描述该任务的源和目的端的信息,setting 用来描述任务本身的信息;
- content 又分为两部分,reader 和 writer,分别用来描述源端和目的端的信息;
- setting 中的 speed 项表示同时起几个并发执行该任务;
job.setting.speed
(流量控制)
Job 支持用户对速度的自定义控制,channel 的值可以控制同步时的并发数,byte 的值可以控制同步时的速度。job.setting.errorLimit
(脏数据控制)
Job 支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record 值)或者脏数据占比阈值(percentage 值),当 Job 传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job 报错退出。
示例
1 | { |
测试
Stream ==> Stream
使用 streamreader + streamwriter(这种情况常用于测试)
配置文件:stream2stream.json
1 | { |
输入执行命令:
1 | python $DATAX_HOME/bin/datax.py ../job/stream2stream.json |
支持的数据类型
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
OceanBase | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | 写 | ||
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
Phoenix4.x | √ | √ | 读 、写 | |
Phoenix5.x | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
Cassandra | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 | ||
时间序列数据库 | OpenTSDB | √ | 读 | |
TSDB | √ | √ | 读 、写 | |
TDengine | √ | √ | 读 、写 |
读
流
1 | "reader": { |
MySQL
1 | "reader": { |
自定SQL
1 | "reader": { |
写
文本
1 | "writer": { |
MySQL
1 | "writer": { |
参数
- preSql
- 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用
@table
表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"]
,效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称 - 必选:否
- 默认值:无
- 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用
- postSql
- 描述:写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
- 必选:否
- 默认值:无
项目中集成
项目引用
maven中添加
1 | mvn install:install-file -Dfile="D:\Tools\BigData\datax\lib\datax-core-0.0.1-SNAPSHOT.jar" -DgroupId="com.datax" -DartifactId="datax-core" -Dversion="0.0.1" -Dpackaging=jar |
项目中引用
1 | <dependency> |
其它依赖
1 | <dependency> |
添加datax任务配置
内容如下
1 | { |
测试
1 | import com.alibaba.datax.core.Engine; |
获取结果
默认我们没法获取任务结束的结果,我们自定义类来获取
ZDataxResult(数据保存类)
1 | package datax; |
ZAbstractContainer
替换了默认的抽象方法
1 | package datax; |
ZJobContainer
保存结果数据
1 | package datax; |
ZEngine
1 | package datax; |
调用
1 | import datax.ZDataxResult; |