Flink开发-Table/SQL操作及自定义函数(UDF)

前言

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/tableApi.html

https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html

本文使用环境版本

  • Hive:2.3.9
  • Flink:flink-1.12.7-bin-scala_2.12

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.psvmc</groupId>
<artifactId>WordCount</artifactId>
<version>1.0</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.7</flink.version>
<hadoop.version>2.7.7</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>

<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<dependencies>
<!-- flink核心API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- rocksdb-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Hive Connector的支持,仅在编译时生效-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>

<!--读取hadoop文件-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

先看一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.flink.table.api.{$, EnvironmentSettings, FieldExpression, SqlDialect, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

object WordCount {
case class Student(id: String, name: String, sex: String, age: Int, department: String)

def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)

val name: String = "hive"
val defaultDataBase: String = "default"
val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"

val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir)
tableEnv.registerCatalog("myHive", hive) // 注册Catalog
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useCatalog("myHive") // 使用注册的Catalog ,不使用的话查不到数据
tableEnv.useDatabase("default") // 设置要查询的数据库
tableEnv.executeSql("show tables").print()

val selectTables_sql = "select id,name,password from t_user"
val result = tableEnv.sqlQuery(selectTables_sql)
result.execute().print()

val mTable = tableEnv.from("t_user").select($"id",$"name",$"password")
mTable.execute().print()
}
}

如上我们可以看到

  • Table 可以调用计算处理相关方法 Table调用execute返回TableResult
  • TableResult 可以用来打印
1
2
3
4
//返回Table 
tableEnv.sqlQuery(sqlstr)
//返回TableResult
tableEnv.executeSql(sqlstr)

排序

1
2
3
var mTable = tableEnv.from("t_user").select($"id", $"name", $"password")
mTable = mTable.orderBy($"id" desc())
mTable.execute().print()

对应的SQL模式

1
2
3
4
5
val selectTables_sql = "select id,name,password from t_user order by id desc"
val result = tableEnv.sqlQuery(selectTables_sql)
result
.execute()
.print()

别名

1
2
val mTable2 = tableEnv.from("t_user").select(call(new MySubstringFunction(), $"name", 0, 5) as ("name2"))
mTable2.execute().print()

Flink Table与Flink SQL

1
2
3
4
5
6
7
8
9
val selectTables_sql = "select id,name,password from t_user order by id desc"
val result = tableEnv.sqlQuery(selectTables_sql)
result
.execute()
.print()

var mTable = tableEnv.from("t_user").select($"id", $"name", $"password")
mTable = mTable.orderBy($"id" desc())
mTable.execute().print()

添加新字段对比

1
cast(0 as bigint)as mark_del
1
tb01 = tb01.addColumns(call("DefaultValueNumLongUdf",0L).as("mark_del"));

添加对应的方法

1
tableEnv.createTemporarySystemFunction("DefaultValueNumLongUdf", DefaultValueNumLongUdf.class);

方法

1
2
3
4
5
6
7
8
9
10
11
12
package com.xhkjedu.udf.trans;

import org.apache.flink.table.functions.ScalarFunction;

/**
* 填充类型的默认值
*/
public class DefaultValueNumLongUdf extends ScalarFunction {
public Long eval(Long num) {
return num;
}
}

替换字段

Flink Table

1
tb01 = tb01.renameColumns($("b").as("b2"), $("c").as("c2"));

Table数据保存

1
2
3
4
5
6
val tb3 = tb2
.select(call("prefunc", $"classname", "ba年级").as("classname"),$"id")
.distinct()
.select(call("subfunc",$"classname",0,3).as("classname2")).execute().print()

val sql="insert into t_class2(id,classname) select id,classname from "+tb3.toString

方式2

1
2
tableEnv.createTemporaryView("mytable", tb3)
val sql="insert into t_class2(id,classname) select id,classname from mytable"

自定义函数(UDF)

Table API 中,根处理的数据类型以及计算方式的不同将自定义函数一共分为三种类别,分别为 :

  • ScalarFunction
  • TableFunction
  • AggregationFunction

ScalarFunction标量函数

自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。

想要实现自定义标量函数,你需要扩展 org.apache.flink.table.functions 里面的 ScalarFunction 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是 public 的,而且名字必须是 eval

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.psvmc

import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.catalog.hive.HiveCatalog

class MySubstringFunction extends ScalarFunction {
def eval(s: String, begin: Integer, end: Integer): String = {
var endValue = end
if (endValue > s.size) {
endValue = s.size
}
s.substring(begin, endValue)
}
}

object WordCount {
def main(args: Array[String]): Unit = {
case class Student(id: String, name: String, sex: String, age: Int, department: String)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)

val name: String = "hive"
val defaultDataBase: String = "default"
val hiveConfDir: String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"

val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir)
tableEnv.registerCatalog("myHive", hive) // 注册Catalog
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useCatalog("myHive") // 使用注册的Catalog ,不使用的话查不到数据
tableEnv.useDatabase("default") // 设置要查询的数据库
tableEnv
.executeSql("show tables")
.print()

//直接使用
val mTable2 = tableEnv.from("t_user").select(call(new MySubstringFunction(), $"name", 0, 5))
mTable2.execute().print()

//注册
tableEnv.createTemporarySystemFunction("MySubstringFunction", classOf[MySubstringFunction])
//使用
tableEnv.sqlQuery("select id,MySubstringFunction(name,0,5) as name2,password from t_user").execute().print()
}
}

两种方式

直接使用

1
2
3
//直接使用
val mTable2 = tableEnv.from("t_user").select(call(new MySubstringFunction(), $"name", 0, 5))
mTable2.execute().print()

注册后使用

1
2
3
4
//注册
tableEnv.createTemporarySystemFunction("MySubstringFunction", classOf[MySubstringFunction])
//使用
tableEnv.sqlQuery("select id,MySubstringFunction(name,0,5) as name2,password from t_user").execute().print()

TableFunction表值函数

跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。

Scalar Function 不同,Table Function:将一个或多个标量字段作为输入参数,且经过计算和处理后返回的是任意数量的记录,不再是单独的一个标量指标,且返回结果中可以含有一列或多列指标,根据自定义 Table Funciton函数返回值确定,因此从形式上看更像是 Table结构数据 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;

@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {

public void eval(String str) {
for (String s : str.split(" ")) {
// use collect(...) to emit a row
collect(Row.of(s, s.length()));
}
}
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里不经注册直接“内联”调用函数
env
.from("MyTable")
.joinLateral(call(SplitFunction.class, $("myField")))
.select($("myField"), $("word"), $("length"));
env
.from("MyTable")
.leftOuterJoinLateral(call(SplitFunction.class, $("myField")))
.select($("myField"), $("word"), $("length"));

// 在 Table API 里重命名函数字段
env
.from("MyTable")
.leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord", "newLength"))
.select($("myField"), $("newWord"), $("newLength"));

// 注册函数
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

// 在 Table API 里调用注册好的函数
env
.from("MyTable")
.joinLateral(call("SplitFunction", $("myField")))
.select($("myField"), $("word"), $("length"));
env
.from("MyTable")
.leftOuterJoinLateral(call("SplitFunction", $("myField")))
.select($("myField"), $("word"), $("length"));

// 在 SQL 里调用注册好的函数
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");

// 在 SQL 里重命名函数字段
env.sqlQuery(
"SELECT myField, newWord, newLength " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");

AggregationFunction聚合函数

自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。

Flink Table API 中提供了User-Defined Aggregate Functions (UDAGGs),其主要功能是将一行或多行数据进行聚合然后输出一个标量值,

例如在数据集中根据 Key求取指定Value 的最大值或最小值。

UDAGG mechanism

下面几个方法是每个 AggregateFunction 必须要实现的:

  • createAccumulator() 主要用于创建 Accumulator,以用于存储计算过程中读取的中间数据,同时在 Accumulator中完成数据的累加操作
  • accumulate() 将每次接入的数据元素累加到定义的accumulator中,另外accumulate()方法也可以通过方法复载的方式处理不同类型的数据
  • getValue() 当完成所有的数据累加操作结束后,最后通过 getValue() 方法返回函数的统计结果,最终完成整个AggregateFunction的计算流程

AggregateFunction 的以下方法在某些场景下是必须实现的:

  • retract()bounded OVER 窗口中是必须实现的。
  • merge() 在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的 AggregateFunction 都实现 merge 方法。
  • resetAccumulator() 在许多批式聚合中是必须实现的

AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}

/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}

@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return 0L;
} else {
return acc.sum / acc.count;
}
}

public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}

public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}

public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}

public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}

// register function
BatchTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());

// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

TableAggregateFunction表值聚合函数

自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。

UDAGG mechanism

表聚合,多对多,多行输入多行输出

  • 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAF),可以把一个表中数据,聚合为具有多行和多列的结果表
  • 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class UdfTest_TableAggregateFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//1.在创建表的DDL中直接定义时间属性
String creatDDL = "CREATE TABLE clickTable (" +
"user_name STRING," +
"url STRING," +
"ts BIGINT," +
"et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间 FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒
" WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒
")WITH(" +
" 'connector' = 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";

tableEnv.executeSql(creatDDL);

//2.注册自定义的表聚合函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);

//3.调用UDF进行查询转换
String windowAggQuery = "SELECT user_name,COUNT(url) AS cnt,window_start,window_end " +
"FROM TABLE(" +
" TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)" +
")" +
"GROUP BY user_name,window_start,window_end";

Table aggTable = tableEnv.sqlQuery(windowAggQuery);

//FlinkSQL 对表聚合函数支持并不是很好,这里使用TableAPI方式
Table resultTable = aggTable.groupBy($("window_end"))
.flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
.select($("window_end"), $("value"), $("rank"));

//4.转换成流打印输出
tableEnv.toChangelogStream(resultTable).print();

env.execute();
}

//单独定义一个累加器类型,包含了当前最大和第二大的数据
public static class Top2Accumulator {
public Long max;
public Long secondMax;
}


//实现自定义的表聚合函数
public static class Top2 extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Accumulator> {

@Override
public Top2Accumulator createAccumulator() {
//创建累加器(初始化累加器)
Top2Accumulator top2Accumulator = new Top2Accumulator();
top2Accumulator.max = Long.MIN_VALUE;
top2Accumulator.secondMax = Long.MIN_VALUE;
return top2Accumulator;
}

//定义一个更新累加器的方法
public void accumulate(Top2Accumulator accumulate, Long value) {
if (value > accumulate.max) {
accumulate.secondMax = accumulate.max;
accumulate.max = value;
} else if (value > accumulate.secondMax) {
accumulate.secondMax = value;
}
}

//输出结果,获取当前的 Top2
public void emitValue(Top2Accumulator accumulator, Collector<Tuple2<Long, Integer>> out) {
if (accumulator.max != Long.MIN_VALUE) {
out.collect(Tuple2.of(accumulator.max, 1));
}
if (accumulator.secondMax != Long.MIN_VALUE) {
out.collect(Tuple2.of(accumulator.secondMax, 2));
}
}
}
}

TableAggregateFunction 要求必须实现的方法

  • createAccumulator()
  • accumulate()
  • emitValue()

首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器

随后,对每个输入行调用函数的 accumulate() 方法来更新累加器

处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果