准备数据
以下两种方式使用任何一种都行。
使用Hive准备数据
进入Hive
删除已存在的表
创建表
1 2 3
| create table t_user(id int,name string)row format delimited fields terminated by ','; create table t_user02(id int,name string)row format delimited fields terminated by ','; create table t_user03(id int,name string,m1 int,d1 int)row format delimited fields terminated by ',';
|
插入准备数据
1
| INSERT INTO t_user(id,name) VALUES (2,'李四');
|
查询数据
使用FlinkSQL Cli准备数据
使用FlinkSQL Cli插入数据
启动FlinkSQL Cli,命令如下:
1
| $FLINK_HOME/bin/sql-client.sh embedded
|
接下来,我们可以查看注册的catalog
结果
default_catalog
myhive
使用注册的myhive catalog
FlinkSQL操作Hive中的表
设置方言
1
| set table.sql-dialect=hive;
|
创建表
1 2
| drop table t_user04; create table t_user04(id int,name string);
|
写入数据
1 2 3
| show tables; INSERT INTO t_user(id,name) VALUES (3,'Xiao Hong'); select * from t_user;
|
退出
连接Hive
1 2 3 4 5 6 7 8 9
| final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String hiveCatalogName = "myhive"; HiveCatalog hiveCatalog = new HiveCatalog(hiveCatalogName, dbNameTarget, hiveConfigPath); tEnv.registerCatalog(hiveCatalogName, hiveCatalog); tEnv.useCatalog(hiveCatalogName); tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tEnv.useDatabase(dbNameTarget);
|
自定义函数
1 2 3 4 5 6 7 8
| import org.apache.flink.table.functions.ScalarFunction;
public class SubstringFunction extends ScalarFunction { public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } }
|
直接使用
1
| tableEnv.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12));
|
注册后使用
注册
1 2
| tableEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
|
使用
1
| tableEnv.from("MyTable").select(call("SubstringFunction", $("myField"),5, 12));
|
注意
函数中要使用Integer,使用int会因为类型不一致而找不到方法。
插入数据
1 2
| String sqlInsert = "insert into t_user03(id,name,m1,d1) select id,name,m1,d1 from " + tb01.toString(); tableEnv.executeSql(sqlInsert).print();
|
实际代码
1 2 3 4 5 6
| String tableName = configJson.getWriter().getTableName(); List<String> fullColumnNameResult = configJson.getWriter().getFullColumnName(); String paraStr = String.join(",", fullColumnNameResult); String sqlInsert = String.format("insert into %s(%s) select %s from %s", tableName, paraStr, paraStr, tb01.toString()); System.out.println(sqlInsert); tableEnv.executeSql(sqlInsert).print();
|
读取项目下的配置文件
1 2 3 4 5 6
| String classPath = Thread.currentThread() .getContextClassLoader() .getResource("") .getPath(); String hiveConfDir = classPath + "conf"; System.out.println("hiveConfDir:" + hiveConfDir);
|
DataStream约束验证
CleanConstraintI
1 2 3 4 5 6 7 8
| public interface CleanConstraintI { String name = "";
String getName();
boolean check(Object... arguments); }
|
StrLengthCons
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class StrLengthCons implements CleanConstraintI { @Override public String getName() { return "字符串长度约束"; }
@Override public boolean check(Object... arguments) { if (arguments.length != 3) { return false; }
String content = (String) arguments[0]; int min = Integer.parseInt(arguments[1].toString() + ""); int max = Integer.parseInt(arguments[2].toString() + ""); if (content.length() >= min && content.length() <= max) { return true; } return false; } }
|
ConsFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import java.util.HashMap; import java.util.Map;
public class ConsFactory { public static ConsFactory instance; private static final Map<String, CleanConstraintI> consMap = new HashMap<>();
public static ConsFactory getInstance() { if (instance == null) { instance = new ConsFactory(); consMap.put("StrLengthCons", new StrLengthCons()); } return instance; }
public boolean run(String consName, Object... arguments) { CleanConstraintI cleanConstraintI = consMap.get(consName); return cleanConstraintI.check(arguments); } }
|