前言
Flink 为处理一列转多列的场景提供了两种返回类型 Tuple 和 Row
- Tuple 只支持1~25个字段,且不能为null,不支持拓展
- Row 支持null同时也无限制字段数,但如果需要使用Row,必须重载实现
getResultType方法
DataStream=>Table
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row;
import java.util.ArrayList; import java.util.List;
public class TCS002 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
List<Row> list = new ArrayList<Row>(); list.add(Row.of("张三",18,"男")); list.add(Row.of("xiaohong",16,"女"));
RowTypeInfo rowTypeInfo = getRowTypeInfo(list.get(0)); DataStream<Row> ds = env.fromCollection(list,rowTypeInfo); tableEnv.registerDataStream("table01",ds); tableEnv.from("table01").execute().print(); }
private static RowTypeInfo getRowTypeInfo(Row row) { TypeInformation[] types = new TypeInformation[row.getArity()]; String[] fieldNames = new String[row.getArity()];
for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); if (field instanceof Integer) { types[i] = BasicTypeInfo.INT_TYPE_INFO; } else if (field instanceof Long) { types[i] = BasicTypeInfo.LONG_TYPE_INFO; } else if (field instanceof Float) { types[i] = BasicTypeInfo.FLOAT_TYPE_INFO; } else if (field instanceof Double) { types[i] = BasicTypeInfo.DOUBLE_TYPE_INFO; } else { types[i] = BasicTypeInfo.STRING_TYPE_INFO; } fieldNames[i] = "f" + i; } return new RowTypeInfo(types, fieldNames); } }
|
其中
1 2
| tableEnv.registerDataStream("table01", ds); Table table01 = tableEnv.from("table01");
|
或者
1
| Table table01 = tableEnv.fromDataStream(ds);
|
Table=>DataStream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row;
import java.util.Objects;
public class TCS005 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String name = "myhive"; String defaultDatabase = "default"; String classPath = Objects.requireNonNull(Thread.currentThread() .getContextClassLoader() .getResource("")).getPath(); String hiveConfDir = classPath + "conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive");
String insertSQL = "select * from t_user "; Table table = tableEnv.sqlQuery(insertSQL);
DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class); rowDataStream.map(new MapFunction<Row, Row>() { @Override public Row map(Row row) throws Exception { System.out.println(row); return row; } }).print("sql==").setParallelism(1); env.execute("test"); } }
|
注意
1 不能使用批处理模式
1 2 3 4
| EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
|
2 注意StreamExecutionEnvironment和StreamTableEnvironment
1 2 3 4
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
|
3 转换的方法很简单
1
| DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
|
Table和Row的转换
获取所有的列
1 2
| Table tb01 = tableEnv.from("t_user"); String[] fieldNames = tb01.getSchema().getFieldNames();
|