大数据开发之ChunJun使用

前言

项目地址

https://github.com/DTStack/chunjun

https://gitee.com/dtstack_dev_0/chunjun

下载地址

https://github.com/DTStack/chunjun/releases

JSON格式示例

https://dtstack.github.io/chunjun/examples/json/binlog@binlog_hive

https://dtstack.github.io/chunjun-web/docs/chunjunDocs/connectors/hive/hive-sink

安装

解压

1
2
3
rm -rf /data/tools/bigdata/chunjun
mkdir -p /data/tools/bigdata/chunjun
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C /data/tools/bigdata/chunjun

配置环境变量

创建配置文件

1
vi /etc/profile.d/chunjun.sh

加入:

1
2
export CHUNJUN_HOME=/data/tools/bigdata/chunjun
export PATH=$CHUNJUN_HOME/bin:$PATH

配置立即生效

1
source /etc/profile

查看CHUNJUN_HOME

1
2
cd $CHUNJUN_HOME
pwd

提交任务

Local

本地提交

Local 模式不依赖Flink环境和Hadoop环境,在本地环境启动一个JVM进程执行纯钧任务。

1
bash $CHUNJUN_HOME/bin/chunjun-local.sh  -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json
1
bash $CHUNJUN_HOME/bin/chunjun-local.sh -job /root/001.json

Standalone

Standalone模式依赖Flink Standalone环境,不依赖Hadoop环境。

将依赖文件复制到Flink lib目录下,例如

1
cp -r chunjun-dist $FLINK_HOME/lib

注意: 这个复制操作需要在所有Flink cluster机器上执行,否则部分任务会出现类找不到的错误。

启动Flink Standalone环境

1
sh $FLINK_HOME/bin/start-cluster.sh

运行

1
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

Yarn Session

Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOMEFLINK_HOME

结束之前的YarnSession

1
2
yarn application -list
yarn application -kill application_1674972891007_0009

我们需要使用yarn-session -t参数上传chunjun-dist

1
$FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_HOME -d -nm chunjun_session

提交任务

通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令

yarn.application.id 也可以在 flink-conf.yaml 中设置;提交成功之后,可以通过 yarn web ui 上观察任务情况。

成功后会打印

1
2
3
4
5
6
JobManager Web Interface: http://hadoop01:45685
2022-11-14 14:17:52,433 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1672710362889_2595
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1672710362889_2595
Note that killing Flink might not clean up all job artifacts and temporary files.

我们可以看到

也可以在Yarn中查看

Yarn监控页面查询:

http://192.168.7.102:8088/cluster

运行

1
bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"application_1672710362889_2595\"}

如果有正在运行的可以这样杀掉

1
yarn application -kill application_1672710362889_2595

测试

1
bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job /root/002.json -confProp {\"yarn.application.id\":\"application_1672710362889_2595\"}

Yarn Per-Job

Yarn Per-Job 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOMEFLINK_HOME

提交步骤

Yarn Per-Job 提交任务配置正确即可提交。

进入本地chunjun-dist目录,执行命令提交任务。

1
bash $CHUNJUN_HOME/bin/chunjun-yarn-perjob.sh -job $CHUNJUN_HOME/chunjun-examples/json/stream/stream.json

提交成功之后,可以通过 yarn web ui 上观察任务情况

编译源码

详细步骤

修改依赖

添加classifier属性

1
2
3
4
5
6
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
<classifier>standalone</classifier>
</dependency>

格式化代码

1
mvn spotless:apply

编译

1
mvn clean package -DskipTests -P default

编译后

如果你是在windows环境下打包,在linux上运行任务前需要执行 sed -i "s/\r//g" bin/*.sh 命令修复sh脚本中的 ‘\r’ 问题。

解压缩

1
2
mkdir chunjun
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C ./chunjun

删除原压缩包

1
rm -rf chunjun-dist-1.12-SNAPSHOT.tar.gz

更换换行符

1
cd chunjun && sed -i "s/\r//g" bin/*.sh

重新压缩

1
tar -czvf ../chunjun-dist-1.12-SNAPSHOT.tar.gz ./*

验证

1
2
cd ../ && rm -rf chunjun && mkdir chunjun
tar -xzvf chunjun-dist-1.12-SNAPSHOT.tar.gz -C ./chunjun

tar命令常用的功能

使用模式任选其一

  • -c:创建新的tar文件
  • -x:解开tar文件
  • -t:列出tar文件中包含的文件的信息
  • -r:附加新的文件到tar文件中

压缩解压模式选择其一

  • -z:使用gzip进行解压缩
  • -j:使用bzip2进行解压缩
  • -Z:使用compress进行解压缩

其他参数

  • -v:显示解压缩执行过程
  • -f:指定要处理的文件名

查找文件

1
find /data -name hive-jdbc*

分发

1
2
3
4
5
6
7
ssh hadoop02 "rm -rf $CHUNJUN_HOME"
ssh hadoop03 "rm -rf $CHUNJUN_HOME"
ha-fenfa.sh $CHUNJUN_HOME

ha-fenfa.sh /etc/profile.d/chunjun.sh
ssh hadoop02 "source /etc/profile"
ssh hadoop03 "source /etc/profile"

Maven依赖

如果Maven中有依赖无法下载

Maven中添加

第1个Jar

1
mvn install:install-file -Dfile="D:\Jars\chunjun-connector-hdfs.jar" -DgroupId="com.dtstack.chunjun" -DartifactId="chunjun-connector-hdfs" -Dversion="1.12-SNAPSHOT" -Dpackaging=jar

第2个Jar

1
mvn install:install-file -Dfile="D:\Jars\chunjun-core.jar" -DgroupId="com.dtstack.chunjun" -DartifactId="chunjun-core" -Dversion="1.12-SNAPSHOT" -Dpackaging=jar

卸载安装的jar

1
2
mvn dependency:purge-local-repository -DmanualInclude="com.dtstack.chunjun:chunjun-connector-hdfs"
mvn dependency:purge-local-repository -DmanualInclude="com.dtstack.chunjun:chunjun-core"

配置

mysql to hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "ID",
"type": "varchar"
},
{
"name": "DIR_ID",
"type": "varchar"
},
{
"name": "UNAME",
"type": "varchar"
},
{
"name": "UPWD",
"type": "varchar"
},
{
"name": "RNAME",
"type": "varchar"
},
{
"name": "MOBILE",
"type": "varchar"
},
{
"name": "EMAIL",
"type": "varchar"
},
{
"name": "LOGIN_DENIED",
"type": "int"
},
{
"name": "STATUS",
"type": "int"
},
{
"name": "CREATED_BY",
"type": "varchar"
},
{
"name": "CREATED_TIME",
"type": "datetime"
},
{
"name": "UPDATED_BY",
"type": "varchar"
},
{
"name": "UPDATED_TIME",
"type": "datetime"
},
{
"name": "REVISION",
"type": "int"
}
],
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://110.110.110.110:3306/yxdp?useSSL=false"
],
"table": [
"sys_user"
]
}
]
}
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://192.168.7.101:10000/default",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"compress": "",
"charsetName": "UTF-8",
"maxFileSize": 1073741824,
"analyticalRules": "paste_source_${schema}_${table}",
"schema": "myHive",
"tablesColumn": "{\"sys_user\":[{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"ID\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"DIR_ID\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"UNAME\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"UPWD\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"RNAME\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"MOBILE\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"EMAIL\"},{\"comment\":\"\",\"type\":\"int\",\"key\":\"LOGIN_DENIED\"},{\"comment\":\"\",\"type\":\"int\",\"key\":\"STATUS\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"CREATED_BY\"},{\"comment\":\"\",\"type\":\"datetime\",\"key\":\"CREATED_TIME\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"UPDATED_BY\"},{\"comment\":\"\",\"type\":\"datetime\",\"key\":\"UPDATED_TIME\"},{\"comment\":\"\",\"type\":\"int\",\"key\":\"REVISION\"}]}",
"partition": "pt",
"partitionType": "MINUTE",
"defaultFS": "hdfs://hdfsns",
"hadoopConfig": {
"dfs.ha.namenodes.ns": "nn1,nn2",
"fs.defaultFS": "hdfs://hdfsns",
"dfs.namenode.rpc-address.hdfsns.nn2": "192.168.7.102:9000",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.hdfsns.nn1": "192.168.7.101:9000",
"dfs.nameservices": "hdfsns",
"fs.hdfs.impl.disable.cache": "true",
"hadoop.user.name": "root",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}

mysql2hdfs

上面的方式chunjun报错了,所以我们可以换一种方式来实现

hive最终是要保存在hdfs中,我们使用hive创建表后,直接保存在对应的hdfs中就能绕过hive。

mysql2hdfs.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
{
"job": {
"content": [
{
"reader": {
"parameter": {
"sliceRecordCount": ["10"],
"column": [
{
"name": "id",
"type": "id"
},
{
"name": "name",
"type": "string",
"value": "psvmc"
}
]
},
"name": "streamreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://hacluster/user/hive/warehouse/t_zdb",
"defaultFS": "hdfs://hacluster",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"fileType": "text",
"maxFileSize": 10485760,
"nextCheckRows": 20000,
"fieldDelimiter": "\u0001",
"encoding": "utf-8",

"writeMode": "overwrite",
"hadoopConfig": {
"fs.defaultFS": "hdfs://hacluster",
"dfs.ha.namenodes.ns": "nn1,nn2",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.hacluster.nn1": "hadoop01:9000",
"dfs.namenode.rpc-address.hacluster.nn2": "hadoop02:9000",
"dfs.nameservices": "hacluster",
"fs.hdfs.impl.disable.cache": "true",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}

注意

  • 其中分隔符使用Hive的默认分割符\u0001

创建表

1
2
3
4
CREATE TABLE default.t_zdb (
id INT,
name STRING
);

测试

查看ID

1
yarn application -list

停止

1
yarn application -kill application_1674972891007_0017

启动

1
$FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_HOME -d -nm chunjun_session

运行

1
bash $CHUNJUN_HOME/bin/chunjun-yarn-session.sh -job /root/mysql2hdfs.json -confProp {\"yarn.application.id\":\"application_1674972891007_0018\"}

这样我们就把数据保存到了hive中。

删除数据

1
hadoop fs -rmr /user/hive/warehouse/t_zdb/*