前言
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/tableApi.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html
本文使用环境版本
- Hive:2.3.9
- Flink:flink-1.12.7-bin-scala_2.12
依赖
1 |
|
先看一个简单的例子
1 | import org.apache.flink.table.api.{$, EnvironmentSettings, FieldExpression, SqlDialect, TableEnvironment} |
如上我们可以看到
- Table 可以调用计算处理相关方法 Table调用execute返回TableResult
- TableResult 可以用来打印
1 | //返回Table |
Flink Table与Flink SQL
1 | val selectTables_sql = "select id,name,password from t_user order by id desc" |
排序
1 | var mTable = tableEnv.from("t_user").select($"id", $"name", $"password") |
对应的SQL模式
1 | val selectTables_sql = "select id,name,password from t_user order by id desc" |
别名
1 | val mTable2 = tableEnv.from("t_user").select(call(new MySubstringFunction(), $"name", 0, 5) as ("name2")) |
添加字段
Flink SQL
1 | cast(0 as bigint)as mark_del |
Flink Table
1 | tb01 = tb01.addColumns(call("DefaultValueNumLongUdf",0L).as("mark_del")); |
添加对应的方法
1 | tableEnv.createTemporarySystemFunction("DefaultValueNumLongUdf", DefaultValueNumLongUdf.class); |
方法
1 | package com.xhkjedu.udf.trans; |
替换字段
Flink Table
1 | tb01 = tb01.renameColumns($("b").as("b2"), $("c").as("c2")); |
打印Schema
1 | tb01.printSchema(); |
或者
1 | System.out.println(Arrays.toString(tb01.getSchema().getFieldNames())); |
DataType判断
根据字段名称获取类型
1 | public static DataType getTypeByName(Table tb, String name) { |
判断类型
1 | public static Boolean isLong(DataType type) { |
Table数据保存
1 | val tb3 = tb2 |
方式2
1 | tableEnv.createTemporaryView("mytable", tb3) |
类型推断
自动类型推导会检查函数的类和求值方法,派生出函数参数和结果的数据类型, @DataTypeHint
和 @FunctionHint
注解支持自动类型推导。
@DataTypeHint
在 Flink Table API 和 SQL 中,@DataTypeHint
注解可以用来指定自定义函数(UDF)的参数和返回值类型,它有以下作用:
- 帮助 Flink 引擎推断出自定义函数的参数和返回值的数据类型。
- 避免 Flink 自动推断类型时产生错误结果。
- 提高自定义函数的运行时性能。
@DataTypeHint 的使用方式:
设置返回值
1 | // 定义 decimal 的精度和小数位 |
设置参数
1 | public class MyUDF extends ScalarFunction { |
@FunctionHint
1 | // 解耦类型推导与求值方法,类型推导完全取决于 FunctionHint |
所以使用 @DataTypeHint 可以让 Flink 更准确地知道 UDF 的数据类型信息,从而提高 Table API 和 SQL 的运行效率。
定制类型推导
1 | import org.apache.flink.table.api.DataTypes; |
自定义函数(UDF)
在 Table API
中,根处理的数据类型以及计算方式的不同将自定义函数一共分为三种类别,
分别为 :
- ScalarFunction
- TableFunction
- AggregationFunction
- TableAggregateFunction
注意项
类型无法映射
UDF参数如果可能为空 要用Long等封装的类型,不要用long,Long会被映射被BIGINT,而long会被映射为BIGINT NOT NULL
String类型被推断为Char(1)
当我们的Java中传的类型是String
时,在UDF中处理的时候,它会自动推断,比如单字符的字符串就会推断为Char(1)
,导致无法找到对应的处理方法。
解决方式:
指定传入的参数的类型,不进行推断
1 | public class MyUDF extends ScalarFunction { |
ScalarFunction标量函数
自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。
简单的说
就是把每行的数据的列进行处理。
想要实现自定义标量函数,你需要扩展 org.apache.flink.table.functions
里面的 ScalarFunction
并且实现一个或者多个求值方法。
标量函数的行为取决于你写的求值方法。求值方法必须是 public
的,而且名字必须是 eval
。
1 | package com.xhkjedu.test; |
TableFunction表值函数
跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。
简单的说
一行多列转多行多列。
和 Scalar Function
不同,Table Function
:将一个或多个标量字段作为输入参数,且经过计算和处理后返回的是任意数量的记录,不再是单独的一个标量指标,且返回结果中可以含有一列或多列指标,根据自定义 Table Funciton
函数返回值确定,因此从形式上看更像是 Table
结构数据 。
在Table API中,
表函数
在Scala语言中使用方法如下:.join(Expression)
或者 .leftOuterJoin(Expression)
,
在Java语言中使用方法如下:.join(String)
或者.leftOuterJoin(String)
。
Join操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行。
leftOuterJoin操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行,并且在表函数返回一个空表的情况下会保留所有的outer rows。
在Sql语法中稍微有点区别:
- cross join用法是
LATERAL TABLE(<TableFunction>)
。 - LEFT JOIN用法是在join条件中加入
ON TRUE
。
示例
1 | package com.xhkjedu.test; |
结果
调用注册后的函数
1 | // 注册函数 |
使用SQL
1 | // 注册函数 |
设置别名
1 | // 注册函数 |
AggregationFunction聚合函数
自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。
简单的说
多行多列转1行1列。
Flink Table API
中提供了User-Defined Aggregate Functions (UDAGGs)
,其主要功能是将一行或多行数据进行聚合然后输出一个标量值,
例如在数据集中根据 Key
求取指定Value
的最大值或最小值。
下面几个方法是每个 AggregateFunction
必须要实现的:
createAccumulator()
主要用于创建Accumulator
,以用于存储计算过程中读取的中间数据,同时在Accumulator
中完成数据的累加操作accumulate()
将每次接入的数据元素累加到定义的accumulator
中,另外accumulate()
方法也可以通过方法复载的方式处理不同类型的数据getValue()
当完成所有的数据累加操作结束后,最后通过 getValue() 方法返回函数的统计结果,最终完成整个AggregateFunction
的计算流程
AggregateFunction
的以下方法在某些场景下是必须实现的:
retract()
在bounded OVER
窗口中是必须实现的。merge()
在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的AggregateFunction
都实现merge
方法。resetAccumulator()
在许多批式聚合中是必须实现的
注意
AggregateFunction<T, ACC>
中T是返回值的类型,ACC是累加器的类型。
accumulate(Accumulator acc, Long input, Long weight)
中第一个参数是累加器的类型,后面的是传入参数的类型,可以传入多个参数。
AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。
根据权重获取值
1 | import org.apache.flink.table.functions.AggregateFunction; |
测试
1 | package com.xhkjedu.test; |
结果
TableAggregateFunction表值聚合函数
自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。
表聚合,多对多,多行输入多行输出
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAF),可以把一个表中数据,聚合为具有多行和多列的结果表
用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的
1 | package com.xhkjedu.test; |
测试
1 | package com.xhkjedu.test; |
TableAggregateFunction 要求必须实现的方法
createAccumulator()
accumulate()
emitValue()
首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator()
方法可以创建空累加器
随后,对每个输入行调用函数的 accumulate()
方法来更新累加器
处理完所有行后,将调用函数的 emitValue()
方法来计算并返回最终结果
常用算法
线性分布
1 | /** |
正态分布
1 | /** |
众数
1 | /** |
中位数
1 | /** |
平均数
1 | /** |
聚合函数示例
注意
流处理不支持聚合操作!!!
流处理不支持聚合操作!!!
流处理不支持聚合操作!!!
空值填充
1 | import org.apache.flink.table.functions.ScalarFunction; |
随机数
正态分布、线性分布的随机数
处理Long
1 | package com.xhkjedu.udf.clean; |
处理Double
1 | package com.xhkjedu.udf.clean; |
测试
1 | package com.xhkjedu.test; |
众数/中位数/平均数
处理Long
1 | package com.xhkjedu.udf.clean; |
处理Double
1 | package com.xhkjedu.udf.clean; |
测试
1 | package com.xhkjedu.test; |