大数据-数据源离线同步工具DataX的使用

前言

官方网址https://github.com/alibaba/DataX

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

93b7fc1c-6927-11e6-8cda-7cf8420fc65f

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

设计理念

ec7e36f4-6927-11e6-8f5f-ffc43d6a468b

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

aa6c95a8-6891-11e6-94b7-39f0ab5af3b4

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。

DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

工具部署

环境

直接下载编译后的DataX工具包:DataX下载地址

下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

解压到目录

/data/tools/bigdata/datax3

Linux

创建配置文件

1
vi /etc/profile.d/datax.sh

内容设置为

1
2
3
# DataX
export DATAX_HOME=/data/tools/bigdata/datax3
export PATH=$PATH:$DATAX_HOME/bin

配置生效

1
source /etc/profile

查看是否生效

1
echo $DATAX_HOME

运行方式

1
2
cd $DATAX_HOME/bin/
python datax.py YOUR_JOB.json

自检脚本:

1
python $DATAX_HOME/bin/datax.py $DATAX_HOME/job/job.json

Windows

添加环境变量DATAX_HOME

D:\Tools\BigData\datax

测试环境环境是否生效(注意使用CMD,不要使用PowerShell)

1
echo %DATAX_HOME%

自检脚本:

1
2
chcp 65001
python %DATAX_HOME%/bin/datax.py %DATAX_HOME%/job/job.json

默认会报错

要保证core.transport.channel.speed.bytejob.setting.speed.byte同时进行设置或全都不设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 10485760
}
}
}
},
"job": {
"setting": {
"speed": {
"byte":10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 100000
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}

如果日志乱码

数字65001代表utf-8格式的编码
数字936代表gbk格式的编码

1
2
chcp 65001	
chcp 936

定时任务

crontab 命令

  • -l 在标准输出上显示当前的crontab。
  • -r 删除当前的crontab文件。
  • -e 使用VISUAL或者EDITOR环境变量所指的编辑器编辑当前的crontab文件。当结束编辑离开时,编辑后的文件将自动安装。

格式如下:

1
minute hour day-of-month month-of-year day-of-week commands

合法值

1
00-59 00-23 01-31 01-12 0-6 (0 is sunday)

除了数字还有几个个特殊的符号就是*/-,*代表所有的取值范围内的数字

  • /代表每的意思,/5表示每5个单位
  • -代表从某个数字到某个数字
  • ,分开几个离散的数字

查看定时任务

1
crontab -l

添加定时任务

1
crontab -e

添加如下

1
5 1 * * * python $DATAX_HOME/bin/datax.py $DATAX_HOME/config/oracle2oracle.json  >>$DATAX_HOME/log/datax_log.`date +\%Y\%m\%d\%H\%M\%S`  2>&1

或者自己添加配置文件

1
crontab /data/cron/mysqlRollBack.cron

配置

配置项

  • job 下面有两个配置项,content 和 setting,其中 content 用来描述该任务的源和目的端的信息,setting 用来描述任务本身的信息;
  • content 又分为两部分,reader 和 writer,分别用来描述源端和目的端的信息;
  • setting 中的 speed 项表示同时起几个并发执行该任务;
  • job.setting.speed(流量控制)
    Job 支持用户对速度的自定义控制,channel 的值可以控制同步时的并发数,byte 的值可以控制同步时的速度。
  • job.setting.errorLimit(脏数据控制)
    Job 支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record 值)或者脏数据占比阈值(percentage 值),当 Job 传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job 报错退出。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}

测试

Stream ==> Stream

使用 streamreader + streamwriter(这种情况常用于测试)
配置文件:stream2stream.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 10485760
}
}
}
},
"job": {
"setting": {
"speed": {
"byte":10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}

输入执行命令:

1
python $DATAX_HOME/bin/datax.py ../job/stream2stream.json

支持的数据类型

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
OceanBase
SQLServer
PostgreSQL
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB
TDengine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [{
"type": "String",
"value": "hello DataX"
}, {
"type": "string",
"value": "DataX Stream To Stream"
}, {
"type": "string",
"value": "数据迁移工具"
}]
}
},

MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},

自定SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://bad_ip:3306/database",
"jdbc:mysql://127.0.0.1:bad_port/database",
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},

文本

1
2
3
4
5
6
7
8
9
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "D:\\Tools\\BigData\\test",
"fileName": "test",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd"
}
}

MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name"
],
"preSql": [
"delete from @table"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8",
"table": [
"tb01"
]
}
]
}
}

参数

  • preSql
    • 描述:写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
    • 必选:否
    • 默认值:无
  • postSql
    • 描述:写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
    • 必选:否
    • 默认值:无

项目中集成

项目引用

image-20221024114503196

maven中添加

1
2
3
mvn install:install-file -Dfile="D:\Tools\BigData\datax\lib\datax-core-0.0.1-SNAPSHOT.jar" -DgroupId="com.datax" -DartifactId="datax-core" -Dversion="0.0.1" -Dpackaging=jar

mvn install:install-file -Dfile="D:\Tools\BigData\datax\lib\datax-common-0.0.1-SNAPSHOT.jar" -DgroupId="com.datax" -DartifactId="datax-common" -Dversion="0.0.1" -Dpackaging=jar

项目中引用

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1</version>
</dependency>

其它依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>

添加datax任务配置

image-20221024121135955

内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 10485760
}
}
}
},
"job": {
"setting": {
"speed": {
"byte":10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.alibaba.datax.core.Engine;

public class TestMain {
public static String getCurrentClasspath(){
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 当前操作系统
String osName = System.getProperty("os.name");
if (osName.startsWith("Win")) {
// 删除path中最前面的/
currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);
}
return currentClasspath;
}
public static void main(String[] args) {
System.setProperty("datax.home","D:\\Tools\\BigData\\datax");
String[] datxArgs2 = {"-job", getCurrentClasspath()+"/datax/test.json", "-mode", "standalone", "-jobid", "-1"};
try {
Engine.entry(datxArgs2);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

获取结果

默认我们没法获取任务结束的结果,我们自定义类来获取

ZDataxResult(数据保存类)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package datax;

public class ZDataxResult {
//任务启动时刻
private long startTimeStamp;
//任务结束时刻
private long endTimeStamp;
//任务总时耗
private long totalCosts;
//任务平均流量
private long byteSpeedPerSecond;
//记录写入速度
private long recordSpeedPerSecond;
//读出记录总数
private long totalReadRecords;
//读写失败总数
private long totalErrorRecords;
//字节数
private long readSucceedBytes;

//转换开始时间
private long startTransferTimeStamp;
//转换结束时间
private long endTransferTimeStamp;
//转换总耗时
private long transferCosts;

//转换成功记录总数
private long totalTransformerSuccessRecords;
//转换失败记录总数
private long totalTransformerFailedRecords;
//转换过滤记录总数
private long totalTransformerFilterRecords;

public long getStartTimeStamp() {
return startTimeStamp;
}

public void setStartTimeStamp(long startTimeStamp) {
this.startTimeStamp = startTimeStamp;
}

public long getEndTimeStamp() {
return endTimeStamp;
}

public void setEndTimeStamp(long endTimeStamp) {
this.endTimeStamp = endTimeStamp;
}

public long getTotalCosts() {
return totalCosts;
}

public void setTotalCosts(long totalCosts) {
this.totalCosts = totalCosts;
}

public long getByteSpeedPerSecond() {
return byteSpeedPerSecond;
}

public void setByteSpeedPerSecond(long byteSpeedPerSecond) {
this.byteSpeedPerSecond = byteSpeedPerSecond;
}

public long getRecordSpeedPerSecond() {
return recordSpeedPerSecond;
}

public void setRecordSpeedPerSecond(long recordSpeedPerSecond) {
this.recordSpeedPerSecond = recordSpeedPerSecond;
}

public long getTotalReadRecords() {
return totalReadRecords;
}

public void setTotalReadRecords(long totalReadRecords) {
this.totalReadRecords = totalReadRecords;
}

public long getTotalErrorRecords() {
return totalErrorRecords;
}

public void setTotalErrorRecords(long totalErrorRecords) {
this.totalErrorRecords = totalErrorRecords;
}

public long getReadSucceedBytes() {
return readSucceedBytes;
}

public void setReadSucceedBytes(long readSucceedBytes) {
this.readSucceedBytes = readSucceedBytes;
}

public long getEndTransferTimeStamp() {
return endTransferTimeStamp;
}

public void setEndTransferTimeStamp(long endTransferTimeStamp) {
this.endTransferTimeStamp = endTransferTimeStamp;
}

public long getStartTransferTimeStamp() {
return startTransferTimeStamp;
}

public void setStartTransferTimeStamp(long startTransferTimeStamp) {
this.startTransferTimeStamp = startTransferTimeStamp;
}

public long getTransferCosts() {
return transferCosts;
}

public void setTransferCosts(long transferCosts) {
this.transferCosts = transferCosts;
}

public long getTotalTransformerSuccessRecords() {
return totalTransformerSuccessRecords;
}

public void setTotalTransformerSuccessRecords(long totalTransformerSuccessRecords) {
this.totalTransformerSuccessRecords = totalTransformerSuccessRecords;
}

public long getTotalTransformerFailedRecords() {
return totalTransformerFailedRecords;
}

public void setTotalTransformerFailedRecords(long totalTransformerFailedRecords) {
this.totalTransformerFailedRecords = totalTransformerFailedRecords;
}

public long getTotalTransformerFilterRecords() {
return totalTransformerFilterRecords;
}

public void setTotalTransformerFilterRecords(long totalTransformerFilterRecords) {
this.totalTransformerFilterRecords = totalTransformerFilterRecords;
}

@Override
public String toString() {
return "ZDataxResult{" +
"任务启动时刻(时间戳ms)=" + startTimeStamp +
", 任务结束时刻(时间戳ms)=" + endTimeStamp +
", 任务总时耗(s)=" + totalCosts +
", 字节数=" + readSucceedBytes +
", 任务平均流量(B/s)=" + byteSpeedPerSecond +
", 读出记录总数=" + totalReadRecords +
", 记录写入速度(rec/s)=" + recordSpeedPerSecond +
", 读写失败总数=" + totalErrorRecords +
", 转换开始时间(时间戳ms)=" + startTransferTimeStamp +
", 转换结束时间(时间戳ms)=" + endTransferTimeStamp +
", 转换总耗时(s)=" + transferCosts +
", 转换成功记录总数=" + totalTransformerSuccessRecords +
", 转换失败记录总数=" + totalTransformerFailedRecords +
", 转换过滤记录总数=" + totalTransformerFilterRecords +
'}';
}
}

ZAbstractContainer

替换了默认的抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package datax;

import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
import org.apache.commons.lang.Validate;

public abstract class ZAbstractContainer {
protected Configuration configuration;
protected AbstractContainerCommunicator containerCommunicator;

public ZAbstractContainer(Configuration configuration) {
Validate.notNull(configuration, "Configuration can not be null.");
this.configuration = configuration;
}

public Configuration getConfiguration() {
return this.configuration;
}

public AbstractContainerCommunicator getContainerCommunicator() {
return this.containerCommunicator;
}

public void setContainerCommunicator(AbstractContainerCommunicator containerCommunicator) {
this.containerCommunicator = containerCommunicator;
}

public abstract ZDataxResult startWithResult();
}

ZJobContainer

保存结果数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
package datax;

import com.alibaba.datax.common.constant.PluginType;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.AbstractJobPlugin;
import com.alibaba.datax.common.plugin.JobPluginCollector;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.statistics.VMInfo;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.StrUtil;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.container.util.HookInvoker;
import com.alibaba.datax.core.container.util.JobAssignUtil;
import com.alibaba.datax.core.job.JobContainer;
import com.alibaba.datax.core.job.scheduler.AbstractScheduler;
import com.alibaba.datax.core.job.scheduler.processinner.StandAloneScheduler;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.core.statistics.container.communicator.AbstractContainerCommunicator;
import com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator;
import com.alibaba.datax.core.statistics.plugin.DefaultJobPluginCollector;
import com.alibaba.datax.core.util.ErrorRecordChecker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.LoadUtil;
import com.alibaba.datax.dataxservice.face.domain.enums.ExecuteMode;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class ZJobContainer extends ZAbstractContainer {
private static final Logger LOG = LoggerFactory.getLogger(JobContainer.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
private long jobId;
private String readerPluginName;
private String writerPluginName;
private Reader.Job jobReader;
private com.alibaba.datax.common.spi.Writer.Job jobWriter;
private Configuration userConf;
private long startTimeStamp;
private long endTimeStamp;
private long startTransferTimeStamp;
private long endTransferTimeStamp;
private int needChannelNumber;
private int totalStage = 1;
private ErrorRecordChecker errorLimit;

public ZJobContainer(Configuration configuration) {
super(configuration);
this.errorLimit = new ErrorRecordChecker(configuration);
}

private void preCheck() {
this.preCheckInit();
this.adjustChannelNumber();
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}

this.preCheckReader();
this.preCheckWriter();
LOG.info("PreCheck通过");
}

private void preCheckInit() {
this.jobId = this.configuration.getLong("core.container.job.id", -1L);
if (this.jobId < 0L) {
LOG.info("Set jobId = 0");
this.jobId = 0L;
this.configuration.set("core.container.job.id", this.jobId);
}

Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());
this.jobReader = this.preCheckReaderInit(jobPluginCollector);
this.jobWriter = this.preCheckWriterInit(jobPluginCollector);
}

private Reader.Job preCheckReaderInit(JobPluginCollector jobPluginCollector) {
this.readerPluginName = this.configuration.getString("job.content[0].reader.name");
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);
this.configuration.set("job.content[0].reader.parameter.dryRun", true);
jobReader.setPluginJobConf(this.configuration.getConfiguration("job.content[0].reader.parameter"));
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration("job.content[0].reader.parameter"));
jobReader.setJobPluginCollector(jobPluginCollector);
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
}

private com.alibaba.datax.common.spi.Writer.Job preCheckWriterInit(JobPluginCollector jobPluginCollector) {
this.writerPluginName = this.configuration.getString("job.content[0].writer.name");
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
com.alibaba.datax.common.spi.Writer.Job jobWriter = (com.alibaba.datax.common.spi.Writer.Job) LoadUtil.loadJobPlugin(PluginType.WRITER, this.writerPluginName);
this.configuration.set("job.content[0].writer.parameter.dryRun", true);
jobWriter.setPluginJobConf(this.configuration.getConfiguration("job.content[0].writer.parameter"));
jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration("job.content[0].reader.parameter"));
jobWriter.setPeerPluginName(this.readerPluginName);
jobWriter.setJobPluginCollector(jobPluginCollector);
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobWriter;
}

private void preCheckReader() {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
LOG.info(String.format("DataX Reader.Job [%s] do preCheck work .", this.readerPluginName));
this.jobReader.preCheck();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}

private void preCheckWriter() {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
LOG.info(String.format("DataX Writer.Job [%s] do preCheck work .", this.writerPluginName));
this.jobWriter.preCheck();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}

private void init() {
this.jobId = this.configuration.getLong("core.container.job.id", -1L);
if (this.jobId < 0L) {
LOG.info("Set jobId = 0");
this.jobId = 0L;
this.configuration.set("core.container.job.id", this.jobId);
}

Thread.currentThread().setName("job-" + this.jobId);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
}

private void prepare() {
this.prepareJobReader();
this.prepareJobWriter();
}

private void preHandle() {
String handlerPluginTypeStr = this.configuration.getString("job.preHandler.pluginType");
if (StringUtils.isNotEmpty(handlerPluginTypeStr)) {
PluginType handlerPluginType;
try {
handlerPluginType = PluginType.valueOf(handlerPluginTypeStr.toUpperCase());
} catch (IllegalArgumentException var6) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, String.format("Job preHandler's pluginType(%s) set error, reason(%s)", handlerPluginTypeStr.toUpperCase(), var6.getMessage()));
}

String handlerPluginName = this.configuration.getString("job.preHandler.pluginName");
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(handlerPluginType, handlerPluginName));
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());
handler.setJobPluginCollector(jobPluginCollector);
handler.preHandler(this.configuration);
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
LOG.info("After PreHandler: \n" + Engine.filterJobConfiguration(this.configuration) + "\n");
}
}

private void postHandle() {
String handlerPluginTypeStr = this.configuration.getString("job.postHandler.pluginType");
if (StringUtils.isNotEmpty(handlerPluginTypeStr)) {
PluginType handlerPluginType;
try {
handlerPluginType = PluginType.valueOf(handlerPluginTypeStr.toUpperCase());
} catch (IllegalArgumentException var6) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, String.format("Job postHandler's pluginType(%s) set error, reason(%s)", handlerPluginTypeStr.toUpperCase(), var6.getMessage()));
}

String handlerPluginName = this.configuration.getString("job.postHandler.pluginName");
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(handlerPluginType, handlerPluginName));
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(this.getContainerCommunicator());
handler.setJobPluginCollector(jobPluginCollector);
handler.postHandler(this.configuration);
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}
}

private int split() {
this.adjustChannelNumber();
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}

List<Configuration> readerTaskConfigs = this.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this.doWriterSplit(taskNumber);
List<Configuration> transformerList = this.configuration.getListConfiguration("job.content[0].transformer");
LOG.debug("transformer configuration: " + JSON.toJSONString(transformerList));
List<Configuration> contentConfig = this.mergeReaderAndWriterTaskConfigs(readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: " + JSON.toJSONString(contentConfig));
this.configuration.set("job.content", contentConfig);
return contentConfig.size();
}

private void adjustChannelNumber() {
int needChannelNumberByByte = 2147483647;
int needChannelNumberByRecord = 2147483647;
boolean isByteLimit = this.configuration.getInt("job.setting.speed.byte", 0) > 0;
if (isByteLimit) {
long globalLimitedByteSpeed = (long) this.configuration.getInt("job.setting.speed.byte", 10485760);
Long channelLimitedByteSpeed = this.configuration.getLong("core.transport.channel.speed.byte");
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0L) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}

needChannelNumberByByte = (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte = needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
}

boolean isRecordLimit = this.configuration.getInt("job.setting.speed.record", 0) > 0;
if (isRecordLimit) {
long globalLimitedRecordSpeed = (long) this.configuration.getInt("job.setting.speed.record", 100000);
Long channelLimitedRecordSpeed = this.configuration.getLong("core.transport.channel.speed.record");
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0L) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
}

needChannelNumberByRecord = (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord = needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}

this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ? needChannelNumberByByte : needChannelNumberByRecord;
if (this.needChannelNumber >= 2147483647) {
boolean isChannelLimit = this.configuration.getInt("job.setting.speed.channel", 0) > 0;
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt("job.setting.speed.channel");
LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels.");
} else {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "Job运行速度必须设置");
}
}
}

private void schedule() {
int channelsPerTaskGroup = this.configuration.getInt("core.container.taskGroup.channel", 5);
int taskNumber = this.configuration.getList("job.content").size();
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
PerfTrace.getInstance().setChannelNumber(this.needChannelNumber);
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, channelsPerTaskGroup);
LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
ExecuteMode executeMode = null;

try {
executeMode = ExecuteMode.STANDALONE;
AbstractScheduler scheduler = this.initStandaloneScheduler(this.configuration);
Iterator var6 = taskGroupConfigs.iterator();

while (true) {
if (!var6.hasNext()) {
if ((executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) && this.jobId <= 0L) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
}

LOG.info("Running by {} Mode.", executeMode);
this.startTransferTimeStamp = System.currentTimeMillis();
scheduler.schedule(taskGroupConfigs);
this.endTransferTimeStamp = System.currentTimeMillis();
break;
}

Configuration taskGroupConfig = (Configuration) var6.next();
taskGroupConfig.set("core.container.job.mode", executeMode.getValue());
}
} catch (Exception var8) {
LOG.error("运行scheduler 模式[{}]出错.", executeMode);
this.endTransferTimeStamp = System.currentTimeMillis();
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, var8);
}

this.checkLimit();
}

private AbstractScheduler initStandaloneScheduler(Configuration configuration) {
AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);
super.setContainerCommunicator(containerCommunicator);
return new StandAloneScheduler(containerCommunicator);
}

private void post() {
this.postJobWriter();
this.postJobReader();
}

private void destroy() {
if (this.jobWriter != null) {
this.jobWriter.destroy();
this.jobWriter = null;
}

if (this.jobReader != null) {
this.jobReader.destroy();
this.jobReader = null;
}

}

private ZDataxResult logStatistics() {
ZDataxResult result = new ZDataxResult();
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000L;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000L;
if (0L == transferCosts) {
transferCosts = 1L;
}

if (super.getContainerCommunicator() != null) {
Communication communication = super.getContainerCommunicator().collect();
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
long byteSpeedPerSecond = communication.getLongCounter("readSucceedBytes") / transferCosts;
long recordSpeedPerSecond = communication.getLongCounter("readSucceedRecords") / transferCosts;
reportCommunication.setLongCounter("byteSpeed", byteSpeedPerSecond);
reportCommunication.setLongCounter("recordSpeed", recordSpeedPerSecond);
super.getContainerCommunicator().report(reportCommunication);
LOG.info(String.format("\n%-26s: %-18s\n%-26s: %-18s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n",
"任务启动时刻", dateFormat.format(this.startTimeStamp),
"任务结束时刻", dateFormat.format(this.endTimeStamp),
"任务总计耗时", totalCosts + "s",
"任务平均流量", StrUtil.stringify(byteSpeedPerSecond) + "/s",
"记录写入速度", recordSpeedPerSecond + "rec/s",
"读出记录总数", String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
"读写失败总数", String.valueOf(CommunicationTool.getTotalErrorRecords(communication))));
result.setStartTimeStamp(this.startTimeStamp);
result.setEndTimeStamp(this.endTimeStamp);
result.setTotalCosts(totalCosts);
result.setByteSpeedPerSecond(byteSpeedPerSecond);
result.setRecordSpeedPerSecond(recordSpeedPerSecond);
result.setTotalReadRecords(CommunicationTool.getTotalReadRecords(communication));
result.setTotalErrorRecords(CommunicationTool.getTotalErrorRecords(communication));
result.setReadSucceedBytes(communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
result.setStartTransferTimeStamp(this.startTransferTimeStamp);
result.setEndTransferTimeStamp(this.endTransferTimeStamp);
result.setTransferCosts(transferCosts);
long totalTransformerSuccessRecords = communication.getLongCounter("totalTransformerSuccessRecords");
long totalTransformerFailedRecords = communication.getLongCounter("totalTransformerFailedRecords");
long totalTransformerFilterRecords = communication.getLongCounter("totalTransformerFilterRecords");
result.setTotalTransformerSuccessRecords(totalTransformerSuccessRecords);
result.setTotalTransformerFailedRecords(totalTransformerFailedRecords);
result.setTotalTransformerFilterRecords(totalTransformerFilterRecords);
if (totalTransformerSuccessRecords > 0L || totalTransformerFailedRecords > 0L || totalTransformerFilterRecords > 0L) {
LOG.info(String.format("\n%-26s: %19s\n%-26s: %19s\n%-26s: %19s\n",
"Transformer成功记录总数", totalTransformerSuccessRecords,
"Transformer失败记录总数", totalTransformerFailedRecords,
"Transformer过滤记录总数", totalTransformerFilterRecords)
);
}
}
return result;
}

private Reader.Job initJobReader(JobPluginCollector jobPluginCollector) {
this.readerPluginName = this.configuration.getString("job.content[0].reader.name");
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(PluginType.READER, this.readerPluginName);
jobReader.setPluginJobConf(this.configuration.getConfiguration("job.content[0].reader.parameter"));
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration("job.content[0].writer.parameter"));
jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
}

private com.alibaba.datax.common.spi.Writer.Job initJobWriter(JobPluginCollector jobPluginCollector) {
this.writerPluginName = this.configuration.getString("job.content[0].writer.name");
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
com.alibaba.datax.common.spi.Writer.Job jobWriter = (com.alibaba.datax.common.spi.Writer.Job) LoadUtil.loadJobPlugin(PluginType.WRITER, this.writerPluginName);
jobWriter.setPluginJobConf(this.configuration.getConfiguration("job.content[0].writer.parameter"));
jobWriter.setPeerPluginJobConf(this.configuration.getConfiguration("job.content[0].reader.parameter"));
jobWriter.setPeerPluginName(this.readerPluginName);
jobWriter.setJobPluginCollector(jobPluginCollector);
jobWriter.init();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobWriter;
}

private void prepareJobReader() {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
LOG.info(String.format("DataX Reader.Job [%s] do prepare work .", this.readerPluginName));
this.jobReader.prepare();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}

private void prepareJobWriter() {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
LOG.info(String.format("DataX Writer.Job [%s] do prepare work .", this.writerPluginName));
this.jobWriter.prepare();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}

private List<Configuration> doReaderSplit(int adviceNumber) {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
List<Configuration> readerSlicesConfigs = this.jobReader.split(adviceNumber);
if (readerSlicesConfigs != null && readerSlicesConfigs.size() > 0) {
LOG.info("DataX Reader.Job [{}] splits to [{}] tasks.", this.readerPluginName, readerSlicesConfigs.size());
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
return readerSlicesConfigs;
} else {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, "reader切分的task数目不能小于等于0");
}
}

private List<Configuration> doWriterSplit(int readerTaskNumber) {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
List<Configuration> writerSlicesConfigs = this.jobWriter.split(readerTaskNumber);
if (writerSlicesConfigs != null && writerSlicesConfigs.size() > 0) {
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.", this.writerPluginName, writerSlicesConfigs.size());
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
return writerSlicesConfigs;
} else {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, "writer切分的task不能小于等于0");
}
}

private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> readerTasksConfigs, List<Configuration> writerTasksConfigs) {
return this.mergeReaderAndWriterTaskConfigs(readerTasksConfigs, writerTasksConfigs, (List) null);
}

private List<Configuration> mergeReaderAndWriterTaskConfigs(List<Configuration> readerTasksConfigs, List<Configuration> writerTasksConfigs, List<Configuration> transformerConfigs) {
if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_SPLIT_ERROR, String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].", readerTasksConfigs.size(), writerTasksConfigs.size()));
} else {
List<Configuration> contentConfigs = new ArrayList();

for (int i = 0; i < readerTasksConfigs.size(); ++i) {
Configuration taskConfig = Configuration.newDefault();
taskConfig.set("reader.name", this.readerPluginName);
taskConfig.set("reader.parameter", readerTasksConfigs.get(i));
taskConfig.set("writer.name", this.writerPluginName);
taskConfig.set("writer.parameter", writerTasksConfigs.get(i));
if (transformerConfigs != null && transformerConfigs.size() > 0) {
taskConfig.set("transformer", transformerConfigs);
}

taskConfig.set("taskId", i);
contentConfigs.add(taskConfig);
}

return contentConfigs;
}
}

private List<Configuration> distributeTasksToTaskGroup(int averTaskPerChannel, int channelNumber, int channelsPerTaskGroup) {
Validate.isTrue(averTaskPerChannel > 0 && channelNumber > 0 && channelsPerTaskGroup > 0, "每个channel的平均task数[averTaskPerChannel],channel数目[channelNumber],每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数");
List<Configuration> taskConfigs = this.configuration.getListConfiguration("job.content");
int taskGroupNumber = channelNumber / channelsPerTaskGroup;
int leftChannelNumber = channelNumber % channelsPerTaskGroup;
if (leftChannelNumber > 0) {
++taskGroupNumber;
}

if (taskGroupNumber == 1) {
final Configuration taskGroupConfig = this.configuration.clone();
taskGroupConfig.set("job.content", this.configuration.getListConfiguration("job.content"));
taskGroupConfig.set("core.container.taskGroup.channel", channelNumber);
taskGroupConfig.set("core.container.taskGroup.id", 0);
return new ArrayList<Configuration>() {
{
this.add(taskGroupConfig);
}
};
} else {
List<Configuration> taskGroupConfigs = new ArrayList();

int taskConfigIndex;
for (taskConfigIndex = 0; taskConfigIndex < taskGroupNumber; ++taskConfigIndex) {
Configuration taskGroupConfig = this.configuration.clone();
List<Configuration> taskGroupJobContent = taskGroupConfig.getListConfiguration("job.content");
taskGroupJobContent.clear();
taskGroupConfig.set("job.content", taskGroupJobContent);
taskGroupConfigs.add(taskGroupConfig);
}

taskConfigIndex = 0;
int channelIndex = 0;
int taskGroupConfigIndex = 0;
List taskGroupJobContent;
if (leftChannelNumber > 0) {
Configuration taskGroupConfig;
for (taskGroupConfig = (Configuration) taskGroupConfigs.get(taskGroupConfigIndex); channelIndex < leftChannelNumber; ++channelIndex) {
for (int i = 0; i < averTaskPerChannel; ++i) {
taskGroupJobContent = taskGroupConfig.getListConfiguration("job.content");
taskGroupJobContent.add(taskConfigs.get(taskConfigIndex++));
taskGroupConfig.set("job.content", taskGroupJobContent);
}
}

taskGroupConfig.set("core.container.taskGroup.channel", leftChannelNumber);
taskGroupConfig.set("core.container.taskGroup.id", taskGroupConfigIndex++);
}

int equalDivisionStartIndex = taskGroupConfigIndex;

Configuration taskGroupConfig;
while (taskConfigIndex < taskConfigs.size() && equalDivisionStartIndex < taskGroupConfigs.size()) {
for (taskGroupConfigIndex = equalDivisionStartIndex; taskGroupConfigIndex < taskGroupConfigs.size() && taskConfigIndex < taskConfigs.size(); ++taskGroupConfigIndex) {
taskGroupConfig = (Configuration) taskGroupConfigs.get(taskGroupConfigIndex);
taskGroupJobContent = taskGroupConfig.getListConfiguration("job.content");
taskGroupJobContent.add(taskConfigs.get(taskConfigIndex++));
taskGroupConfig.set("job.content", taskGroupJobContent);
}
}

taskGroupConfigIndex = equalDivisionStartIndex;

while (taskGroupConfigIndex < taskGroupConfigs.size()) {
taskGroupConfig = (Configuration) taskGroupConfigs.get(taskGroupConfigIndex);
taskGroupConfig.set("core.container.taskGroup.channel", channelsPerTaskGroup);
taskGroupConfig.set("core.container.taskGroup.id", taskGroupConfigIndex++);
}

return taskGroupConfigs;
}
}

private void postJobReader() {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.readerPluginName));
LOG.info("DataX Reader.Job [{}] do post work.", this.readerPluginName);
this.jobReader.post();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}

private void postJobWriter() {
this.classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.writerPluginName));
LOG.info("DataX Writer.Job [{}] do post work.", this.writerPluginName);
this.jobWriter.post();
this.classLoaderSwapper.restoreCurrentThreadClassLoader();
}

private void checkLimit() {
Communication communication = super.getContainerCommunicator().collect();
this.errorLimit.checkRecordLimit(communication);
this.errorLimit.checkPercentageLimit(communication);
}

private void invokeHooks() {
Communication comm = super.getContainerCommunicator().collect();
HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", this.configuration, comm.getCounter());
invoker.invokeAll();
}

@Override
public ZDataxResult startWithResult() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
boolean var11 = false;
ZDataxResult result = new ZDataxResult();
try {
var11 = true;
this.startTimeStamp = System.currentTimeMillis();
isDryRun = this.configuration.getBool("job.setting.dryRun", false);
if (isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
var11 = false;
} else {
this.userConf = this.configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
var11 = false;
}
} catch (Throwable var12) {
LOG.error("Exception when job run", var12);
hasException = true;
if (var12 instanceof OutOfMemoryError) {
this.destroy();
System.gc();
}

if (super.getContainerCommunicator() == null) {
AbstractContainerCommunicator tempContainerCollector = new StandAloneJobContainerCommunicator(this.configuration);
super.setContainerCommunicator(tempContainerCollector);
}

Communication communication = super.getContainerCommunicator().collect();
communication.setThrowable(var12);
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
super.getContainerCommunicator().report(reportCommunication);
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, var12);
} finally {
if (var11) {
if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}

LOG.info(PerfTrace.getInstance().summarizeNoException());
result = this.logStatistics();
}
}

}
}

if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}

LOG.info(PerfTrace.getInstance().summarizeNoException());
result = this.logStatistics();
}
}
return result;
}
}

ZEngine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package datax;

import com.alibaba.datax.common.element.ColumnCast;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.common.statistics.PerfTrace;
import com.alibaba.datax.common.statistics.VMInfo;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.core.AbstractContainer;
import com.alibaba.datax.core.Engine;
import com.alibaba.datax.core.job.JobContainer;
import com.alibaba.datax.core.taskgroup.TaskGroupContainer;
import com.alibaba.datax.core.util.ConfigParser;
import com.alibaba.datax.core.util.ConfigurationValidate;
import com.alibaba.datax.core.util.ExceptionTracker;
import com.alibaba.datax.core.util.FrameworkErrorCode;
import com.alibaba.datax.core.util.container.LoadUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ZEngine {

private static final Logger LOG = LoggerFactory.getLogger(Engine.class);
private static String RUNTIME_MODE;

public ZEngine() {
}

public ZDataxResult start(Configuration allConf) {
ColumnCast.bind(allConf);
LoadUtil.bind(allConf);
boolean isJob = !"taskGroup".equalsIgnoreCase(allConf.getString("core.container.model"));
int channelNumber = 0;
int taskGroupId = -1;
Object container;
long instanceId;
if (isJob) {
allConf.set("core.container.job.mode", RUNTIME_MODE);
container = new ZJobContainer(allConf);
instanceId = allConf.getLong("core.container.job.id", 0L);
} else {
container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong("core.container.job.id");
taskGroupId = allConf.getInt("core.container.taskGroup.id");
channelNumber = allConf.getInt("core.container.taskGroup.channel");
}

boolean traceEnable = allConf.getBool("core.container.trace.enable", true);
boolean perfReportEnable = allConf.getBool("core.dataXServer.reportPerfLog", true);
if (instanceId == -1L) {
perfReportEnable = false;
}

int priority = 0;

try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
} catch (NumberFormatException var13) {
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: " + System.getProperty("PROIORY"));
}

Configuration jobInfoConfig = allConf.getConfiguration("job.jobInfo");
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);
return ((ZAbstractContainer)container).startWithResult();
}

public static String filterJobConfiguration(Configuration configuration) {
Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
Configuration jobContent = jobConfWithSetting.getConfiguration("content");
filterSensitiveConfiguration(jobContent);
jobConfWithSetting.set("content", jobContent);
return jobConfWithSetting.beautify();
}

public static Configuration filterSensitiveConfiguration(Configuration configuration) {
Set<String> keys = configuration.getKeys();
Iterator var2 = keys.iterator();

while(var2.hasNext()) {
String key = (String)var2.next();
boolean isSensitive = StringUtils.endsWithIgnoreCase(key, "password") || StringUtils.endsWithIgnoreCase(key, "accessKey");
if (isSensitive && configuration.get(key) instanceof String) {
configuration.set(key, configuration.getString(key).replaceAll(".", "*"));
}
}

return configuration;
}

public static ZDataxResult entry(String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
MessageSource.init(configuration);
MessageSource.reloadResourceBundle(Configuration.class);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString, dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}

boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1L) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
} else {
configuration.set("core.container.job.id", jobId);
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}

LOG.info("\n" + filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
ZEngine engine = new ZEngine();
return engine.start(configuration);
}
}

private static long parseJobIdFromUrl(List<String> patternStringList, String url) {
long result = -1L;
Iterator var4 = patternStringList.iterator();

do {
if (!var4.hasNext()) {
return result;
}

String patternString = (String)var4.next();
result = doParseJobIdFromUrl(patternString, url);
} while(result == -1L);

return result;
}

private static long doParseJobIdFromUrl(String patternString, String url) {
Pattern pattern = Pattern.compile(patternString);
Matcher matcher = pattern.matcher(url);
return matcher.find() ? Long.parseLong(matcher.group(1)) : -1L;
}

public static void main(String[] args) throws Exception {
int exitCode = 0;

try {
entry(args);
} catch (Throwable var6) {
exitCode = 1;
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(var6));
if (var6 instanceof DataXException) {
DataXException tempException = (DataXException)var6;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode)errorCode;
exitCode = tempErrorCode.toExitValue();
}
}

System.exit(exitCode);
}

System.exit(exitCode);
}
}

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import datax.ZDataxResult;
import datax.ZEngine;

public class TestMain {
public static String getCurrentClasspath(){
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 当前操作系统
String osName = System.getProperty("os.name");
if (osName.startsWith("Win")) {
// 删除path中最前面的/
currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);
}
return currentClasspath;
}
public static void main(String[] args) {
System.setProperty("datax.home","D:\\Tools\\BigData\\datax");
String[] datxArgs2 = {"-job", getCurrentClasspath()+"/datax/test.json", "-mode", "standalone", "-jobid", "-1"};
try {
ZDataxResult result = ZEngine.entry(datxArgs2);
System.out.println(result);
} catch (Throwable e) {
e.printStackTrace();
}
}
}