Flink任务重启策略设置

重启策略分类

Flink支持不同的重启策略,可以控制在发生故障时如何重启新启动作业。

重启策略 重启策略值 描述 默认值
Fixed delay fixed-delay 尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。 启用 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,默认值为Integer.MAX_VALUE
Failure rate failure-rate 失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。
No restart None Job直接失败,不会尝试进行重启 没有启用 checkpointing,则使用无重启 (no restart) 策略

重启策略核心点

1)重启策略,都有重试次数和重试之间等待时间的规定,不同点在于,分别限定了最大的失败次数和规定时间内失败次数。具体根据场景设置

2)重启策略开启后,如果程序有异常出现,多数情况会出现与第三方交互的地方连接异常情况,类似mysql kafka等连接失败,没有一定经验不好定位问题。

重启策略设置

配置文件中设置

全局配置 flink-conf.yaml

固定间隔策略

全局配置 flink-conf.yaml,表示每10s重试一次,最多重试3次

1
2
3
restart-strategy: fixed-delay 
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

失败率策略

全局配置 flink-conf.yaml,5分钟内若失败了3次则认为该job失败,重试间隔为10s

1
2
3
4
restart-strategy: failure-rate  
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

无策略

不重试

1
restart-strategy: none

代码中设置

固定间隔策略

表示每10s重试一次,最多重试3次

1
2
3
4
5
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS)) // 间隔
);

对TableEnvironment配置

1
2
3
4
5
6
7
8
9
10
11
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 3);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(10));

失败率策略

5分钟内若失败了3次则认为该job失败,重试间隔为10s

1
2
3
4
5
6
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, //一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS)) // 间隔
);

对TableEnvironment配置

1
2
3
4
5
6
7
8
9
10
11
12
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 3);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.ofSeconds(300));
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY, Duration.ofSeconds(10));

无策略

不重试

1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

对TableEnvironment配置

1
2
3
4
5
6
7
8
9
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");

设置并行度

StreamExecutionEnvironment配置

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

对TableEnvironment配置

1
2
3
4
5
6
7
8
9
10
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);
Configuration configuration = tEnv.getConfig().getConfiguration();

configuration.setInteger("parallelism.default", 1);

流/批的两种方式

旧写法

批处理

使用的是默认的Planner。

1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

流处理

使用了基于Blink的Planner。

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

注意

StreamTableEnvironment虽然使用旧写法但是依旧使用了基于Blink的Planner,和新写法完全相同。

但是BatchTableEnvironment使用新旧两种方式使用的是不同的Planner。

新写法

使用Blink引擎

Blink 是一种 Apache Flink 查询引擎,它采用了新的架构来支持包括 SQL、批处理和流式数据处理等不同类型的查询和任务。相对传统 Flink 批处理引擎,Blink 引擎的优势在于具有更高的性能和更好的稳定性、可扩展性。Blink 引擎主要采用了以下几种技术:

  • 混合批流处理引擎:Blink 引擎采用了一种混合批流处理引擎的架构,充分利用了批处理和流处理的各自优势,从而在处理不同类型的数据时具有更好的性能和效率。
  • 优化器:Blink 引擎采用了 Acrticus 优化器,它具有更高的优化能力,在查询处理过程中性能更好,并且可以应对更为复杂的查询场景。
  • 统一查询接口:Blink 引擎具有更为统一的 SQL 查询接口,能够支持更多种类的查询和任务,同时也更加适合与其他开源组件集成使用。

总的来说,Blink 引擎是 Flink 的一个重要组件,可以让 Flink 在处理大规模数据时具有更好的性能和扩展性。

批处理

使用了基于Blink的Planner。

1
2
3
4
5
6
7
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

流处理

1
2
3
4
5
6
7
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

也可以改为下面的写法,是完全等效的。

1
2
3
4
5
6
7
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

1
2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);