前言
本文使用环境版本
- Hive:2.3.9
- Flink:flink-1.12.7-bin-scala_2.12
只能用批处理的情况
以下情况要使用批处理而不能用流处理
操作中有聚合操作的。
有join连接操作的,并且要保存到Hive中的。因为该操作会在运行的过程中执行更新操作,而Hive不支持所以会报错。
错误信息
doesn't support consuming update and delete changes which is produced by node Join
Flink可用于批处理和流处理两种场景,具体取决于您的需求和数据处理方式。
下面是一些适用于批处理场景的情况:
批量数据处理:如果您有大量静态数据需要处理,并且可以等待所有数据到达后再进行计算,那么使用Flink进行批处理非常合适。Flink提供了高吞吐量和可伸缩性,可以处理大规模的数据集。
批量报表生成:当您需要从大量数据中生成报表或汇总统计信息时,Flink可以帮助您高效地进行批量计算和聚合操作。您可以使用Flink的批处理API编写并行化的数据处理逻辑,加速报表生成过程。
数据迁移和转换:如果您需要将数据从一个存储系统迁移到另一个系统,并进行必要的转换和处理,Flink可以提供灵活且可扩展的批处理能力。
您可以使用Flink读取源数据,进行转换和清洗,然后将结果写入目标存储系统。
需要注意的是,Flink也非常适用于流处理场景,特别是对于实时数据处理和流式计算的需求。
使用Flink的流处理API,您可以实时接收和处理数据流,对每个事件进行计算,并基于事件时间或处理时间执行窗口操作、聚合操作和模式匹配等。
总而言之,Flink既适用于批处理场景,也适用于流处理场景。
您可以根据具体的需求和数据处理模式选择适合的API和功能来使用Flink。
批处理
初始化
1 | EnvironmentSettings settings = EnvironmentSettings |
配置失败重试次数
1 | System.out.println("=========================================================="); |
设置jobname
从Flink 1.12开始可以通过pipeline.name
设置jobname。
TableEnvironment设置jobname:
1 | //设置jobname |
和Hive整合
1 | HiveCatalog hive = new HiveCatalog("myhive", dbNameTarget, hiveConfig); |
其中HiveCatalog方法
1 | HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) |
其中
hiveConfDir
是Hive配置文件所在路径"/data/tools/bigdata/apache-hive-2.1.0-bin/conf"
流处理
初始化
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
也可简写为
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
配置失败重试次数
1 | System.out.println("=========================================================="); |
设置jobname
StreamExecutionEnvironment
1 | String jobname = "【" + configJson.getType() + "】" + reader.getDbName() + "." + reader.getTableName() + "=>" + writer.getDbName() + "." + writer.getTableName(); |
StreamTableEnvironment
从Flink 1.12开始可以通过pipeline.name
设置jobname。
1 | //设置jobname |
和Hive整合
1 | HiveCatalog hive = new HiveCatalog("myhive", dbNameTarget, hiveConfig); |
其中HiveCatalog方法
1 | HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) |
其中
hiveConfDir
是Hive配置文件所在路径"/data/tools/bigdata/apache-hive-2.1.0-bin/conf"
设置并行数
1 | final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
设置检查点
只有流任务可以设置检查点,批任务没有类似enableCheckpointing
的方法。
检查点是 Flink 中提供的一种容错机制,可以将流处理程序的状态保存到磁盘上,以便在发生故障时可以恢复程序的执行。
enableCheckpointing
传入的值单位为毫秒。
如下所示,每5秒就设置一下检查点
1 | final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |