前言
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/tableApi.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html
本文使用环境版本
- Hive:2.3.9
- Flink:flink-1.12.7-bin-scala_2.12
依赖
1 |
|
先看一个简单的例子
1 | import org.apache.flink.table.api.{$, EnvironmentSettings, FieldExpression, SqlDialect, TableEnvironment} |
如上我们可以看到
- Table 可以调用计算处理相关方法 Table调用execute返回TableResult
- TableResult 可以用来打印
1 | //返回Table |
排序
1 | var mTable = tableEnv.from("t_user").select($"id", $"name", $"password") |
对应的SQL模式
1 | val selectTables_sql = "select id,name,password from t_user order by id desc" |
别名
1 | val mTable2 = tableEnv.from("t_user").select(call(new MySubstringFunction(), $"name", 0, 5) as ("name2")) |
Flink Table与Flink SQL
1 | val selectTables_sql = "select id,name,password from t_user order by id desc" |
添加新字段对比
Flink SQL
1 | cast(0 as bigint)as mark_del |
Flink Table
1 | tb01 = tb01.addColumns(call("DefaultValueNumLongUdf",0L).as("mark_del")); |
添加对应的方法
1 | tableEnv.createTemporarySystemFunction("DefaultValueNumLongUdf", DefaultValueNumLongUdf.class); |
方法
1 | package com.xhkjedu.udf.trans; |
替换字段
Flink Table
1 | tb01 = tb01.renameColumns($("b").as("b2"), $("c").as("c2")); |
Table数据保存
1 | val tb3 = tb2 |
方式2
1 | tableEnv.createTemporaryView("mytable", tb3) |
自定义函数(UDF)
在 Table API
中,根处理的数据类型以及计算方式的不同将自定义函数一共分为三种类别,分别为 :
- ScalarFunction
- TableFunction
- AggregationFunction
ScalarFunction标量函数
自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。
想要实现自定义标量函数,你需要扩展 org.apache.flink.table.functions
里面的 ScalarFunction
并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是 public
的,而且名字必须是 eval
。
1 | package cn.psvmc |
两种方式
直接使用
1 | //直接使用 |
注册后使用
1 | //注册 |
TableFunction表值函数
跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。
和 Scalar Function
不同,Table Function
:将一个或多个标量字段作为输入参数,且经过计算和处理后返回的是任意数量的记录,不再是单独的一个标量指标,且返回结果中可以含有一列或多列指标,根据自定义 Table Funciton
函数返回值确定,因此从形式上看更像是 Table
结构数据 。
1 | import org.apache.flink.table.annotation.DataTypeHint; |
AggregationFunction聚合函数
自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。
Flink Table API
中提供了User-Defined Aggregate Functions (UDAGGs)
,其主要功能是将一行或多行数据进行聚合然后输出一个标量值,
例如在数据集中根据 Key
求取指定Value
的最大值或最小值。
下面几个方法是每个 AggregateFunction
必须要实现的:
createAccumulator()
主要用于创建Accumulator
,以用于存储计算过程中读取的中间数据,同时在Accumulator
中完成数据的累加操作accumulate()
将每次接入的数据元素累加到定义的accumulator
中,另外accumulate()
方法也可以通过方法复载的方式处理不同类型的数据getValue()
当完成所有的数据累加操作结束后,最后通过 getValue() 方法返回函数的统计结果,最终完成整个AggregateFunction
的计算流程
AggregateFunction
的以下方法在某些场景下是必须实现的:
retract()
在bounded OVER
窗口中是必须实现的。merge()
在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的AggregateFunction
都实现merge
方法。resetAccumulator()
在许多批式聚合中是必须实现的
AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。
1 | /** |
TableAggregateFunction表值聚合函数
自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。
表聚合,多对多,多行输入多行输出
- 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAF),可以把一个表中数据,聚合为具有多行和多列的结果表
- 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的
1 | public class UdfTest_TableAggregateFunction { |
TableAggregateFunction 要求必须实现的方法
createAccumulator()
accumulate()
emitValue()
首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator()
方法可以创建空累加器
随后,对每个输入行调用函数的 accumulate()
方法来更新累加器
处理完所有行后,将调用函数的 emitValue()
方法来计算并返回最终结果