Flink开发-批处理和流处理(初始化、设置重试次数、设置jobname)

前言

本文使用环境版本

  • Hive:2.3.9
  • Flink:flink-1.12.7-bin-scala_2.12

只能用批处理的情况

以下情况要使用批处理而不能用流处理

  • 操作中有聚合操作的。

  • 有join连接操作的,并且要保存到Hive中的。因为该操作会在运行的过程中执行更新操作,而Hive不支持所以会报错。

    错误信息doesn't support consuming update and delete changes which is produced by node Join

Flink可用于批处理和流处理两种场景,具体取决于您的需求和数据处理方式。

下面是一些适用于批处理场景的情况:

  1. 批量数据处理:如果您有大量静态数据需要处理,并且可以等待所有数据到达后再进行计算,那么使用Flink进行批处理非常合适。Flink提供了高吞吐量和可伸缩性,可以处理大规模的数据集。

  2. 批量报表生成:当您需要从大量数据中生成报表或汇总统计信息时,Flink可以帮助您高效地进行批量计算和聚合操作。您可以使用Flink的批处理API编写并行化的数据处理逻辑,加速报表生成过程。

  3. 数据迁移和转换:如果您需要将数据从一个存储系统迁移到另一个系统,并进行必要的转换和处理,Flink可以提供灵活且可扩展的批处理能力。

    您可以使用Flink读取源数据,进行转换和清洗,然后将结果写入目标存储系统。

需要注意的是,Flink也非常适用于流处理场景,特别是对于实时数据处理和流式计算的需求。

使用Flink的流处理API,您可以实时接收和处理数据流,对每个事件进行计算,并基于事件时间或处理时间执行窗口操作、聚合操作和模式匹配等。

总而言之,Flink既适用于批处理场景,也适用于流处理场景。

您可以根据具体的需求和数据处理模式选择适合的API和功能来使用Flink。

批处理

初始化

1
2
3
4
5
6
7
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

配置失败重试次数

1
2
3
4
5
6
7
System.out.println("==========================================================");
System.out.println("配置的重试次数:" + configJson.getRetryNum());
System.out.println("==========================================================");
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, configJson.getRetryNum());
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10));

设置jobname

从Flink 1.12开始可以通过pipeline.name设置jobname。

TableEnvironment设置jobname:

1
2
3
4
//设置jobname
Configuration configuration = tEnv.getConfig().getConfiguration();
String jobname = "【" + configJson.getType() + "】" + reader.getDbName() + "." + reader.getTableName() + "=>" + writer.getDbName() + "." + writer.getTableName();
configuration.setString("pipeline.name", jobname);

和Hive整合

1
2
3
4
5
HiveCatalog hive = new HiveCatalog("myhive", dbNameTarget, hiveConfig);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase(dbNameTarget);

其中HiveCatalog方法

1
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir)

其中

  • hiveConfDir是Hive配置文件所在路径"/data/tools/bigdata/apache-hive-2.1.0-bin/conf"

流处理

初始化

1
2
3
4
5
6
7
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

也可简写为

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

配置失败重试次数

1
2
3
4
5
6
7
System.out.println("==========================================================");
System.out.println("配置的重试次数:" + configJson.getRetryNum());
System.out.println("==========================================================");
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
configJson.getRetryNum(), // 最大重试次数
Time.of(10, TimeUnit.SECONDS) // 重试间隔
));

设置jobname

StreamExecutionEnvironment

1
2
String jobname = "【" + configJson.getType() + "】" + reader.getDbName() + "." + reader.getTableName() + "=>" + writer.getDbName() + "." + writer.getTableName();
env.execute(jobname);

StreamTableEnvironment

从Flink 1.12开始可以通过pipeline.name设置jobname。

1
2
3
4
//设置jobname
Configuration configuration = tEnv.getConfig().getConfiguration();
String jobname = "【" + configJson.getType() + "】" + reader.getDbName() + "." + reader.getTableName() + "=>" + writer.getDbName() + "." + writer.getTableName();
configuration.setString("pipeline.name", jobname);

和Hive整合

1
2
3
4
5
HiveCatalog hive = new HiveCatalog("myhive", dbNameTarget, hiveConfig);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase(dbNameTarget);

其中HiveCatalog方法

1
HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir)

其中

  • hiveConfDir是Hive配置文件所在路径"/data/tools/bigdata/apache-hive-2.1.0-bin/conf"

设置并行数

1
2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

设置检查点

只有流任务可以设置检查点,批任务没有类似enableCheckpointing的方法。

检查点是 Flink 中提供的一种容错机制,可以将流处理程序的状态保存到磁盘上,以便在发生故障时可以恢复程序的执行。

enableCheckpointing传入的值单位为毫秒。

如下所示,每5秒就设置一下检查点

1
2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5 * 1000);