准备数据
以下两种方式使用任何一种都行。
Hive是使用Hadoop的MapReduce处理的。
FlinkSQL Cli是使用Flink来处理的。
使用Hive准备数据
进入Hive
1 | hive |
删除已存在的表
1 | drop table t_user; |
创建表
1 | create table t_user(id int,name string)row format delimited fields terminated by ','; |
插入准备数据
1 | INSERT INTO t_user(id,name) VALUES (2,'李四'); |
查询数据
1 | select * from t_user; |
使用FlinkSQL Cli准备数据
使用FlinkSQL Cli插入数据
启动FlinkSQL Cli,命令如下:
1 | $FLINK_HOME/bin/sql-client.sh embedded |
接下来,我们可以查看注册的catalog
1 | show catalogs; |
结果
default_catalog
myhive
使用注册的myhive catalog
1 | use catalog myhive; |
FlinkSQL操作Hive中的表
设置方言
1 | set table.sql-dialect=hive; |
创建表
1 | drop table t_user04; |
写入数据
1 | show tables; |
退出
1 | exit; |
连接Hive
1 | final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
自定义函数
1 | import org.apache.flink.table.functions.ScalarFunction; |
直接使用
1 | tableEnv.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12)); |
注册后使用
注册
1 | // 注册函数 |
使用
1 | tableEnv.from("MyTable").select(call("SubstringFunction", $("myField"),5, 12)); |
注意
函数中要使用Integer,使用int会因为类型不一致而找不到方法。
插入数据
1 | String sqlInsert = "insert into t_user03(id,name,m1,d1) select id,name,m1,d1 from " + tb01.toString(); |
实际代码
1 | String tableName = configJson.getWriter().getTableName(); |
读取项目下的配置文件
1 | String classPath = Thread.currentThread() |
DataStream约束验证
CleanConstraintI
1 | public interface CleanConstraintI { |
StrLengthCons
1 | public class StrLengthCons implements CleanConstraintI { |
ConsFactory
1 | import java.util.HashMap; |