Flink开发-DataStream和Table互相转换

前言

Flink 为处理一列转多列的场景提供了两种返回类型 Tuple 和 Row

  • Tuple 只支持1~25个字段,且不能为null,不支持拓展
  • Row 支持null同时也无限制字段数,但如果需要使用Row,必须重载实现getResultType方法

DataStream=>Table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

public class TCS002 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

List<Row> list = new ArrayList<Row>();
list.add(Row.of("张三",18,"男"));
list.add(Row.of("xiaohong",16,"女"));

RowTypeInfo rowTypeInfo = getRowTypeInfo(list.get(0));
DataStream<Row> ds = env.fromCollection(list,rowTypeInfo);
tableEnv.registerDataStream("table01",ds);
tableEnv.from("table01").execute().print();
}

private static RowTypeInfo getRowTypeInfo(Row row) {
TypeInformation[] types = new TypeInformation[row.getArity()];
String[] fieldNames = new String[row.getArity()];

for (int i = 0; i < row.getArity(); i++) {
Object field = row.getField(i);
if (field instanceof Integer) {
types[i] = BasicTypeInfo.INT_TYPE_INFO;
} else if (field instanceof Long) {
types[i] = BasicTypeInfo.LONG_TYPE_INFO;
} else if (field instanceof Float) {
types[i] = BasicTypeInfo.FLOAT_TYPE_INFO;
} else if (field instanceof Double) {
types[i] = BasicTypeInfo.DOUBLE_TYPE_INFO;
} else {
types[i] = BasicTypeInfo.STRING_TYPE_INFO;
}
fieldNames[i] = "f" + i;
}
return new RowTypeInfo(types, fieldNames);
}
}

其中

1
2
tableEnv.registerDataStream("table01", ds);
Table table01 = tableEnv.from("table01");

或者

1
Table table01 = tableEnv.fromDataStream(ds);

Table=>DataStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

import java.util.Objects;

public class TCS005 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String name = "myhive";
String defaultDatabase = "default";
String classPath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader()
.getResource("")).getPath();
String hiveConfDir = classPath + "conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用注册的catalog
tableEnv.useCatalog("myhive");

// 查询Hive数据
String insertSQL = "select * from t_user ";
Table table = tableEnv.sqlQuery(insertSQL);

DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
rowDataStream.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row row) throws Exception {
System.out.println(row);
return row;
}
}).print("sql==").setParallelism(1);
//任务启动,这行必不可少!
env.execute("test");
}
}

注意

1 不能使用批处理模式

1
2
3
4
// 错误示例
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// 正确示例
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

2 注意StreamExecutionEnvironmentStreamTableEnvironment

1
2
3
4
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

3 转换的方法很简单

1
DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);

Table和Row的转换

获取所有的列

1
2
Table tb01 = tableEnv.from("t_user");
String[] fieldNames = tb01.getSchema().getFieldNames();