Flink中Table语法的聚合操作

前言

https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html

https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/tableApi.html

内置的聚合方法

Flink Table 内置的聚合方法包括:

  1. sum():求和
  2. count():计数
  3. avg():平均值
  4. min():最小值
  5. max():最大值
  6. stddevPop():计算整体标准偏差
  7. stddevSamp():计算样本数据的标准偏差
  8. varPop():计算整体的方差
  9. varSamp():计算样本数据的方差

另外,Flink Table 还支持自定义聚合方法。

注意

在做聚合操作时,select的字段如果没有聚合方法,则必须在groupBy中。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.xhkjedu.test;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.table.api.Expressions.*;

public class TableApiExample {
public static class SensorData {
public String id;
public long timestamp;
public double temperature;

public SensorData(String id, long timestamp, double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
}


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
List<Row> list = new ArrayList<Row>();
list.add(Row.of("sensor1", 1613624800000L, 25.0));
list.add(Row.of("sensor2", 1613624800000L, 30.0));
list.add(Row.of("sensor1", 1613625600000L, 26.8));
list.add(Row.of("sensor1", 1613626200000L, 27.3));
list.add(Row.of("sensor2", 1613624800000L, 29.8));
list.add(Row.of("sensor1", 1613626500000L, 27.3));
list.add(Row.of("sensor2", 1613624800000L, 30.0));
list.add(Row.of("sensor2", 1613626000000L, 28.5));
list.add(Row.of("sensor1", 1613627100000L, 27.9));
list.add(Row.of("sensor1", 1613627700000L, 26.2));

RowTypeInfo rowTypeInfo = getRowTypeInfo(list.get(0));
DataStream<Row> ds = env.fromCollection(list, rowTypeInfo);
tableEnv.registerDataStream("table01", ds);

// 按id进行分组并对temp字段进行聚合
Table resultTable = tableEnv.from("table01")
.groupBy($("f0"), $("f1"))
.select(
$("f0"),
$("f1"),
$("f2").count().as("cnt")
);

// Table resultTable = tableEnv.sqlQuery("select f0,f1,count(f2) as cnt from table01 group by f0,f1");

// 打印输出
resultTable.execute().print();
}

private static RowTypeInfo getRowTypeInfo(Row row) {
TypeInformation[] types = new TypeInformation[row.getArity()];
String[] fieldNames = new String[row.getArity()];

for (int i = 0; i < row.getArity(); i++) {
Object field = row.getField(i);
if (field instanceof Integer) {
types[i] = BasicTypeInfo.INT_TYPE_INFO;
} else if (field instanceof Long) {
types[i] = BasicTypeInfo.LONG_TYPE_INFO;
} else if (field instanceof Float) {
types[i] = BasicTypeInfo.FLOAT_TYPE_INFO;
} else if (field instanceof Double) {
types[i] = BasicTypeInfo.DOUBLE_TYPE_INFO;
} else {
types[i] = BasicTypeInfo.STRING_TYPE_INFO;
}
fieldNames[i] = "f" + i;
}
return new RowTypeInfo(types, fieldNames);
}
}

新旧写法

旧写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.xhkjedu.test;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建输入数据流
DataStream<Tuple2<String, Long>> input = env.fromElements(
new Tuple2<>("a", 3L),
new Tuple2<>("a", 5L),
new Tuple2<>("b", 4L),
new Tuple2<>("b", 2L),
new Tuple2<>("b", 1L)
);

// 使用自定义聚合函数进行聚合
Table result = tableEnv.fromDataStream(input, $("a"), $("b"))
.groupBy("a")
.select("a, avg(b) as avg,min(b) as min,max(b) as max");

result.execute().print();
}
}

新写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.xhkjedu.test;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建输入数据流
DataStream<Tuple2<String, Long>> input = env.fromElements(
new Tuple2<>("a", 3L),
new Tuple2<>("b", 4L),
new Tuple2<>("a", 5L),
new Tuple2<>("b", 2L),
new Tuple2<>("b", 1L)
);

// 使用自定义聚合函数进行聚合
Table result = tableEnv.fromDataStream(input, $("a"), $("b"))
.groupBy($("a"))
.select($("a"), $("b").avg().as("avg"), $("b").min().as("min"), $("b").max().as("max"));

result.execute().print();
}
}

聚合方法-中位数

在 Flink Table 中自定义聚合方法,需要定义一个继承自 org.apache.flink.table.functions.AggregateFunction 的类,并实现其中的方法。

以下是一个计算中位数的自定义聚合方法示例,用Java实现。

中位数聚合方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.xhkjedu.test;

import org.apache.flink.table.functions.AggregateFunction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

// 泛型 Double 表示输出值类型
public class MedianAggregateFunction extends AggregateFunction<Double, MedianAggregateFunction.Accumulator> {

public static class Accumulator {
public List<Long> values = new ArrayList<Long>();
}

@Override
public Accumulator createAccumulator() {
return new Accumulator();
}


public void accumulate(Accumulator acc, Long input) {
acc.values.add(input);
}

public void retract(Accumulator acc, Long input) {
acc.values.remove(input);
}

@Override
public Double getValue(Accumulator acc) {
Collections.sort(acc.values);
List<Long> values = acc.values;
if (values.size() > 0) {
if (values.size() % 2 == 1) {
return (double) values.get(values.size() / 2);
} else {
return (values.get(values.size() / 2 - 1) + values.get(values.size() / 2)) / 2.0;
}
} else {
return null;
}
}
}

然后在 Flink Table 中使用该自定义聚合方法,示例代码如下:

使用StreamExecutionEnvironment测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.xhkjedu.test;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporarySystemFunction("median", MedianAggregateFunction.class);
// 创建输入数据流
DataStream<Tuple2<String, Long>> ds = env.fromElements(
new Tuple2<>("a", 3L),
new Tuple2<>("b", 4L),
new Tuple2<>("a", 5L),
new Tuple2<>("b", 2L),
new Tuple2<>("b", 1L)
);

// 使用自定义聚合函数进行聚合
Table result = tableEnv.fromDataStream(ds, $("a"), $("b"));
result.printSchema();
result = result
.groupBy("a")
.select("a, avg(b) as avg,min(b) as min,max(b) as max,median(b) as median");

result.execute().print();
}
}

以上代码将对数据流表中的 b字段进行中位数聚合操作,得到每个 a对应的中位数。在查询语句中使用 median(b) 对 amount 字段进行聚合,其中 median 是注册的自定义聚合方法的名称。

使用TableEnvironment测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.xhkjedu.test;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.row;

public class TableApiExample {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

tableEnv.createTemporarySystemFunction("median", MedianAggregateFunction.class);

Table result = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.BIGINT())
),
row("a", 3L),
row("b", 4L),
row("a", 5L),
row("b", 2L),
row("b", 1L)
);

result.printSchema();

result = result
.groupBy("name")
.select("name, avg(value) as avg,min(value) as min,max(value) as max,median(value) as median");

result.execute().print();
}
}

根据权重去重

聚合类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.xhkjedu.test;

import org.apache.flink.table.functions.AggregateFunction;

import java.util.ArrayList;
import java.util.List;

// 泛型 Long 表示输出值类型
public class DuplicateAggregateFunction extends AggregateFunction<Long, DuplicateAggregateFunction.Accumulator> {

public static class WeightMoodel {
public long value;
public long weight;

public WeightMoodel(long value, long weight) {
this.value = value;
this.weight = weight;
}

public WeightMoodel() {
}
}

public static class Accumulator {
public List<WeightMoodel> values = new ArrayList<>();
}

@Override
public Accumulator createAccumulator() {
return new Accumulator();
}


public void accumulate(Accumulator acc, Long input, Long weight) {
acc.values.add(new WeightMoodel(input, weight));
}

@Override
public Long getValue(Accumulator acc) {
List<WeightMoodel> values = acc.values;
if (values.size() > 0) {
long weight = values.get(0).weight;
long value = values.get(0).value;
for (WeightMoodel weightMoodel : values) {
if (weightMoodel.weight >= weight) {
value = weightMoodel.value;
}
}

return value;

} else {
return null;
}

}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.xhkjedu.test;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.row;

public class TableApiExample {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

tableEnv.createTemporarySystemFunction("duplicate", DuplicateAggregateFunction.class);

Table result = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.BIGINT()),
DataTypes.FIELD("weight", DataTypes.BIGINT())//权重
),
row("a", 3L, 1),
row("a", 5L, 2),
row("b", 4L, 2),
row("b", 2L, 1),
row("b", 1L, 3)
);


result.printSchema();

result = result
.groupBy("name")
.select("name, duplicate(value,weight) as duplicate");

result.execute().print();
}
}

结果

image-20230508171243914

添加重复标记

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.xhkjedu.test;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.row;

public class TableApiExample {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

Table result = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.BIGINT()),
DataTypes.FIELD("weight", DataTypes.BIGINT())//权重
),
row("a", 3L, 1),
row("a", 5L, 2),
row("b", 4L, 2),
row("b", 2L, 1),
row("b", 1L, 3)
);


result.printSchema();

Table countTable = result
.groupBy($("name"))
.select($("name").as("name2"), $("value").count().as("count"));

result = result
.leftOuterJoin(
countTable,
$("name").isEqual($("name2"))
)
.select($("name"), $("value"), $("count"), $("count").isGreater(1).as("duplicate"))
.orderBy($("name"));


result.execute().print();
}
}

结果

image-20230508171059326

BIGINT和BIGINT NOT NULL无法匹配

Flink保存报错

Exception in thread “main” java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(BIGINT hole_key) NOT NULL
converted type:
RecordType(BIGINT NOT NULL hole_key) NOT NULL

原因是

在Hive中建表的字段设置的的时候设置为BIGINT,这时候存的值是可以为空的,但是Flink的count操作,返回的结果是不为空的BIGINT NOT NULL,

这对于Flink来说是两种类型,所以我们要转换Flink做count之后的数据类型:

1
$(prop).count().cast(DataTypes.BIGINT()).as(target)

其中

  • prop是原字段
  • target是目标字段
  • cast是转换类型的方法,如果要转为不为空,代码为cast(DataTypes.BIGINT().notNull())