Flink开发-Java使用Table和SQL语法操作Hive

准备数据

以下两种方式使用任何一种都行。

  • Hive是使用Hadoop的MapReduce处理的。

  • FlinkSQL Cli是使用Flink来处理的。

使用Hive准备数据

进入Hive

1
2
hive
use default;

删除已存在的表

1
drop table t_user;

创建表

1
2
3
create table t_user(id int,name string)row format delimited fields terminated by ',';
create table t_user02(id int,name string)row format delimited fields terminated by ',';
create table t_user03(id int,name string,m1 int,d1 int)row format delimited fields terminated by ',';

插入准备数据

1
INSERT INTO t_user(id,name) VALUES (2,'李四');

查询数据

1
select * from t_user;

使用FlinkSQL Cli准备数据

使用FlinkSQL Cli插入数据

启动FlinkSQL Cli,命令如下:

1
$FLINK_HOME/bin/sql-client.sh embedded

接下来,我们可以查看注册的catalog

1
show catalogs;

结果

default_catalog
myhive

使用注册的myhive catalog

1
use catalog myhive;

FlinkSQL操作Hive中的表

设置方言

1
set table.sql-dialect=hive;

创建表

1
2
drop table t_user04;
create table t_user04(id int,name string);

写入数据

1
2
3
show tables;
INSERT INTO t_user(id,name) VALUES (3,'Xiao Hong');
select * from t_user;

退出

1
exit;

连接Hive

1
2
3
4
5
6
7
8
9
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//构造hive catalog
String hiveCatalogName = "myhive"; // Catalog名称,定义一个唯一的名称表示
HiveCatalog hiveCatalog = new HiveCatalog(hiveCatalogName, dbNameTarget, hiveConfigPath);
tEnv.registerCatalog(hiveCatalogName, hiveCatalog);
tEnv.useCatalog(hiveCatalogName);
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.useDatabase(dbNameTarget);

自定义函数

1
2
3
4
5
6
7
8
import org.apache.flink.table.functions.ScalarFunction;

// 定义函数逻辑
public class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}

直接使用

1
tableEnv.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12));

注册后使用

注册

1
2
// 注册函数
tableEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);

使用

1
tableEnv.from("MyTable").select(call("SubstringFunction", $("myField"),5, 12));

注意

函数中要使用Integer,使用int会因为类型不一致而找不到方法。

插入数据

1
2
String sqlInsert = "insert into t_user03(id,name,m1,d1) select id,name,m1,d1 from " + tb01.toString();
tableEnv.executeSql(sqlInsert).print();

实际代码

1
2
3
4
5
6
String tableName = configJson.getWriter().getTableName();
List<String> fullColumnNameResult = configJson.getWriter().getFullColumnName();
String paraStr = String.join(",", fullColumnNameResult);
String sqlInsert = String.format("insert into %s(%s) select %s from %s", tableName, paraStr, paraStr, tb01.toString());
System.out.println(sqlInsert);
tableEnv.executeSql(sqlInsert).print();

读取项目下的配置文件

1
2
3
4
5
6
String classPath = Thread.currentThread()
.getContextClassLoader()
.getResource("")
.getPath();
String hiveConfDir = classPath + "conf";
System.out.println("hiveConfDir:" + hiveConfDir);

DataStream约束验证

CleanConstraintI

1
2
3
4
5
6
7
public interface CleanConstraintI {
String name = "";

String getName();

boolean check(Object... arguments);
}

StrLengthCons

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class StrLengthCons implements CleanConstraintI {
@Override
public String getName() {
return "字符串长度约束";
}

@Override
public boolean check(Object... arguments) {
if (arguments.length != 3) {
return false;
}

String content = (String) arguments[0];
int min = Integer.parseInt(arguments[1].toString() + "");
int max = Integer.parseInt(arguments[2].toString() + "");
if (content.length() >= min && content.length() <= max) {
return true;
}
return false;
}
}

ConsFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.HashMap;
import java.util.Map;

public class ConsFactory {
public static ConsFactory instance;
private static final Map<String, CleanConstraintI> consMap = new HashMap<>();

public static ConsFactory getInstance() {
if (instance == null) {
instance = new ConsFactory();
consMap.put("StrLengthCons", new StrLengthCons());
}
return instance;
}

public boolean run(String consName, Object... arguments) {
CleanConstraintI cleanConstraintI = consMap.get(consName);
return cleanConstraintI.check(arguments);
}
}