前言
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/tableApi.html
内置的聚合方法
Flink Table 内置的聚合方法包括:
- sum():求和
 - count():计数
 - avg():平均值
 - min():最小值
 - max():最大值
 - stddevPop():计算整体标准偏差
 - stddevSamp():计算样本数据的标准偏差
 - varPop():计算整体的方差
 - varSamp():计算样本数据的方差
 
另外,Flink Table 还支持自定义聚合方法。
注意
在做聚合操作时,select的字段如果没有聚合方法,则必须在groupBy中。
示例
1  | package com.xhkjedu.test;  | 
新旧写法
旧写法
1  | package com.xhkjedu.test;  | 
新写法
1  | package com.xhkjedu.test;  | 
聚合方法-中位数
在 Flink Table 中自定义聚合方法,需要定义一个继承自 org.apache.flink.table.functions.AggregateFunction 的类,并实现其中的方法。
以下是一个计算中位数的自定义聚合方法示例,用Java实现。
中位数聚合方法
1  | package com.xhkjedu.test;  | 
然后在 Flink Table 中使用该自定义聚合方法,示例代码如下:
使用StreamExecutionEnvironment测试
1  | package com.xhkjedu.test;  | 
以上代码将对数据流表中的 b字段进行中位数聚合操作,得到每个 a对应的中位数。在查询语句中使用 median(b) 对 amount 字段进行聚合,其中 median 是注册的自定义聚合方法的名称。
使用TableEnvironment测试
1  | package com.xhkjedu.test;  | 
根据权重去重
聚合类
1  | package com.xhkjedu.test;  | 
测试类
1  | package com.xhkjedu.test;  | 
结果
 
添加重复标记
1  | package com.xhkjedu.test;  | 
结果
 
BIGINT和BIGINT NOT NULL无法匹配
Flink保存报错
Exception in thread “main” java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(BIGINT hole_key) NOT NULL
converted type:
RecordType(BIGINT NOT NULL hole_key) NOT NULL
原因是
在Hive中建表的字段设置的的时候设置为BIGINT,这时候存的值是可以为空的,但是Flink的count操作,返回的结果是不为空的BIGINT NOT NULL,
这对于Flink来说是两种类型,所以我们要转换Flink做count之后的数据类型:
1  | $(prop).count().cast(DataTypes.BIGINT()).as(target)  | 
其中
prop是原字段target是目标字段cast是转换类型的方法,如果要转为不为空,代码为cast(DataTypes.BIGINT().notNull())