前言
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/tableApi.html
内置的聚合方法
Flink Table 内置的聚合方法包括:
- sum():求和
- count():计数
- avg():平均值
- min():最小值
- max():最大值
- stddevPop():计算整体标准偏差
- stddevSamp():计算样本数据的标准偏差
- varPop():计算整体的方差
- varSamp():计算样本数据的方差
另外,Flink Table 还支持自定义聚合方法。
注意
在做聚合操作时,select的字段如果没有聚合方法,则必须在groupBy中。
示例
1 | package com.xhkjedu.test; |
新旧写法
旧写法
1 | package com.xhkjedu.test; |
新写法
1 | package com.xhkjedu.test; |
聚合方法-中位数
在 Flink Table 中自定义聚合方法,需要定义一个继承自 org.apache.flink.table.functions.AggregateFunction
的类,并实现其中的方法。
以下是一个计算中位数的自定义聚合方法示例,用Java实现。
中位数聚合方法
1 | package com.xhkjedu.test; |
然后在 Flink Table 中使用该自定义聚合方法,示例代码如下:
使用StreamExecutionEnvironment测试
1 | package com.xhkjedu.test; |
以上代码将对数据流表中的 b字段进行中位数聚合操作,得到每个 a对应的中位数。在查询语句中使用 median(b)
对 amount 字段进行聚合,其中 median
是注册的自定义聚合方法的名称。
使用TableEnvironment测试
1 | package com.xhkjedu.test; |
根据权重去重
聚合类
1 | package com.xhkjedu.test; |
测试类
1 | package com.xhkjedu.test; |
结果
添加重复标记
1 | package com.xhkjedu.test; |
结果
BIGINT和BIGINT NOT NULL无法匹配
Flink保存报错
Exception in thread “main” java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(BIGINT hole_key) NOT NULL
converted type:
RecordType(BIGINT NOT NULL hole_key) NOT NULL
原因是
在Hive中建表的字段设置的的时候设置为BIGINT
,这时候存的值是可以为空的,但是Flink的count操作,返回的结果是不为空的BIGINT NOT NULL
,
这对于Flink来说是两种类型,所以我们要转换Flink做count之后的数据类型:
1 | $(prop).count().cast(DataTypes.BIGINT()).as(target) |
其中
prop
是原字段target
是目标字段cast
是转换类型的方法,如果要转为不为空,代码为cast(DataTypes.BIGINT().notNull())