前言 https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/functions/udfs.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/tableApi.html
内置的聚合方法 Flink Table 内置的聚合方法包括:
sum():求和
count():计数
avg():平均值
min():最小值
max():最大值
stddevPop():计算整体标准偏差
stddevSamp():计算样本数据的标准偏差
varPop():计算整体的方差
varSamp():计算样本数据的方差
另外,Flink Table 还支持自定义聚合方法。
注意
在做聚合操作时,select的字段如果没有聚合方法,则必须在groupBy中。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package com.xhkjedu.test;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.util.ArrayList;import java.util.List;import static org.apache.flink.table.api.Expressions.*;public class TableApiExample { public static class SensorData { public String id; public long timestamp; public double temperature; public SensorData (String id, long timestamp, double temperature) { this .id = id; this .timestamp = timestamp; this .temperature = temperature; } } public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); List<Row> list = new ArrayList <Row>(); list.add(Row.of("sensor1" , 1613624800000L , 25.0 )); list.add(Row.of("sensor2" , 1613624800000L , 30.0 )); list.add(Row.of("sensor1" , 1613625600000L , 26.8 )); list.add(Row.of("sensor1" , 1613626200000L , 27.3 )); list.add(Row.of("sensor2" , 1613624800000L , 29.8 )); list.add(Row.of("sensor1" , 1613626500000L , 27.3 )); list.add(Row.of("sensor2" , 1613624800000L , 30.0 )); list.add(Row.of("sensor2" , 1613626000000L , 28.5 )); list.add(Row.of("sensor1" , 1613627100000L , 27.9 )); list.add(Row.of("sensor1" , 1613627700000L , 26.2 )); RowTypeInfo rowTypeInfo = getRowTypeInfo(list.get(0 )); DataStream<Row> ds = env.fromCollection(list, rowTypeInfo); tableEnv.registerDataStream("table01" , ds); Table resultTable = tableEnv.from("table01" ) .groupBy($("f0" ), $("f1" )) .select( $("f0" ), $("f1" ), $("f2" ).count().as("cnt" ) ); resultTable.execute().print(); } private static RowTypeInfo getRowTypeInfo (Row row) { TypeInformation[] types = new TypeInformation [row.getArity()]; String[] fieldNames = new String [row.getArity()]; for (int i = 0 ; i < row.getArity(); i++) { Object field = row.getField(i); if (field instanceof Integer) { types[i] = BasicTypeInfo.INT_TYPE_INFO; } else if (field instanceof Long) { types[i] = BasicTypeInfo.LONG_TYPE_INFO; } else if (field instanceof Float) { types[i] = BasicTypeInfo.FLOAT_TYPE_INFO; } else if (field instanceof Double) { types[i] = BasicTypeInfo.DOUBLE_TYPE_INFO; } else { types[i] = BasicTypeInfo.STRING_TYPE_INFO; } fieldNames[i] = "f" + i; } return new RowTypeInfo (types, fieldNames); } }
新旧写法 旧写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.xhkjedu.test;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class TableApiExample { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<Tuple2<String, Long>> input = env.fromElements( new Tuple2 <>("a" , 3L ), new Tuple2 <>("a" , 5L ), new Tuple2 <>("b" , 4L ), new Tuple2 <>("b" , 2L ), new Tuple2 <>("b" , 1L ) ); Table result = tableEnv.fromDataStream(input, $("a" ), $("b" )) .groupBy("a" ) .select("a, avg(b) as avg,min(b) as min,max(b) as max" ); result.execute().print(); } }
新写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.xhkjedu.test;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class TableApiExample { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<Tuple2<String, Long>> input = env.fromElements( new Tuple2 <>("a" , 3L ), new Tuple2 <>("b" , 4L ), new Tuple2 <>("a" , 5L ), new Tuple2 <>("b" , 2L ), new Tuple2 <>("b" , 1L ) ); Table result = tableEnv.fromDataStream(input, $("a" ), $("b" )) .groupBy($("a" )) .select($("a" ), $("b" ).avg().as("avg" ), $("b" ).min().as("min" ), $("b" ).max().as("max" )); result.execute().print(); } }
聚合方法-中位数 在 Flink Table 中自定义聚合方法,需要定义一个继承自 org.apache.flink.table.functions.AggregateFunction 的类,并实现其中的方法。
以下是一个计算中位数的自定义聚合方法示例,用Java实现。
中位数聚合方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.xhkjedu.test;import org.apache.flink.table.functions.AggregateFunction;import java.util.ArrayList;import java.util.Collections;import java.util.List;public class MedianAggregateFunction extends AggregateFunction <Double, MedianAggregateFunction.Accumulator> { public static class Accumulator { public List<Long> values = new ArrayList <Long>(); } @Override public Accumulator createAccumulator () { return new Accumulator (); } public void accumulate (Accumulator acc, Long input) { acc.values.add(input); } public void retract (Accumulator acc, Long input) { acc.values.remove(input); } @Override public Double getValue (Accumulator acc) { Collections.sort(acc.values); List<Long> values = acc.values; if (values.size() > 0 ) { if (values.size() % 2 == 1 ) { return (double ) values.get(values.size() / 2 ); } else { return (values.get(values.size() / 2 - 1 ) + values.get(values.size() / 2 )) / 2.0 ; } } else { return null ; } } }
然后在 Flink Table 中使用该自定义聚合方法,示例代码如下:
使用StreamExecutionEnvironment测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.xhkjedu.test;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class TableApiExample { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporarySystemFunction("median" , MedianAggregateFunction.class); DataStream<Tuple2<String, Long>> ds = env.fromElements( new Tuple2 <>("a" , 3L ), new Tuple2 <>("b" , 4L ), new Tuple2 <>("a" , 5L ), new Tuple2 <>("b" , 2L ), new Tuple2 <>("b" , 1L ) ); Table result = tableEnv.fromDataStream(ds, $("a" ), $("b" )); result.printSchema(); result = result .groupBy("a" ) .select("a, avg(b) as avg,min(b) as min,max(b) as max,median(b) as median" ); result.execute().print(); } }
以上代码将对数据流表中的 b字段进行中位数聚合操作,得到每个 a对应的中位数。在查询语句中使用 median(b) 对 amount 字段进行聚合,其中 median 是注册的自定义聚合方法的名称。
使用TableEnvironment测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.xhkjedu.test;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.row;public class TableApiExample { public static void main (String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.createTemporarySystemFunction("median" , MedianAggregateFunction.class); Table result = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("name" , DataTypes.STRING()), DataTypes.FIELD("value" , DataTypes.BIGINT()) ), row("a" , 3L ), row("b" , 4L ), row("a" , 5L ), row("b" , 2L ), row("b" , 1L ) ); result.printSchema(); result = result .groupBy("name" ) .select("name, avg(value) as avg,min(value) as min,max(value) as max,median(value) as median" ); result.execute().print(); } }
根据权重去重 聚合类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.xhkjedu.test;import org.apache.flink.table.functions.AggregateFunction;import java.util.ArrayList;import java.util.List;public class DuplicateAggregateFunction extends AggregateFunction <Long, DuplicateAggregateFunction.Accumulator> { public static class WeightMoodel { public long value; public long weight; public WeightMoodel (long value, long weight) { this .value = value; this .weight = weight; } public WeightMoodel () { } } public static class Accumulator { public List<WeightMoodel> values = new ArrayList <>(); } @Override public Accumulator createAccumulator () { return new Accumulator (); } public void accumulate (Accumulator acc, Long input, Long weight) { acc.values.add(new WeightMoodel (input, weight)); } @Override public Long getValue (Accumulator acc) { List<WeightMoodel> values = acc.values; if (values.size() > 0 ) { long weight = values.get(0 ).weight; long value = values.get(0 ).value; for (WeightMoodel weightMoodel : values) { if (weightMoodel.weight >= weight) { value = weightMoodel.value; } } return value; } else { return null ; } } }
测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.xhkjedu.test;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.row;public class TableApiExample { public static void main (String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.createTemporarySystemFunction("duplicate" , DuplicateAggregateFunction.class); Table result = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("name" , DataTypes.STRING()), DataTypes.FIELD("value" , DataTypes.BIGINT()), DataTypes.FIELD("weight" , DataTypes.BIGINT()) ), row("a" , 3L , 1 ), row("a" , 5L , 2 ), row("b" , 4L , 2 ), row("b" , 2L , 1 ), row("b" , 1L , 3 ) ); result.printSchema(); result = result .groupBy("name" ) .select("name, duplicate(value,weight) as duplicate" ); result.execute().print(); } }
结果
添加重复标记 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package com.xhkjedu.test;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.$;import static org.apache.flink.table.api.Expressions.row;public class TableApiExample { public static void main (String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); Table result = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("name" , DataTypes.STRING()), DataTypes.FIELD("value" , DataTypes.BIGINT()), DataTypes.FIELD("weight" , DataTypes.BIGINT()) ), row("a" , 3L , 1 ), row("a" , 5L , 2 ), row("b" , 4L , 2 ), row("b" , 2L , 1 ), row("b" , 1L , 3 ) ); result.printSchema(); Table countTable = result .groupBy($("name" )) .select($("name" ).as("name2" ), $("value" ).count().as("count" )); result = result .leftOuterJoin( countTable, $("name" ).isEqual($("name2" )) ) .select($("name" ), $("value" ), $("count" ), $("count" ).isGreater(1 ).as("duplicate" )) .orderBy($("name" )); result.execute().print(); } }
结果
BIGINT和BIGINT NOT NULL无法匹配 Flink保存报错
Exception in thread “main” java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(BIGINT hole_key) NOT NULL converted type: RecordType(BIGINT NOT NULL hole_key) NOT NULL
原因是
在Hive中建表的字段设置的的时候设置为BIGINT ,这时候存的值是可以为空的,但是Flink的count操作,返回的结果是不为空的BIGINT NOT NULL,
这对于Flink来说是两种类型,所以我们要转换Flink做count之后的数据类型:
1 $(prop).count().cast(DataTypes.BIGINT()).as(target)
其中
prop是原字段
target是目标字段
cast是转换类型的方法,如果要转为不为空,代码为cast(DataTypes.BIGINT().notNull())