Task Failure Recovery

When a task failure happens, Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.

Restart strategies and failover strategies are used to control the task restarting. Restart strategies decide whether and when the failed/affected tasks can be restarted. Failover strategies decide which tasks should be restarted to recover the job.

Restart Strategies

The cluster can be started with a default restart strategy which is always used when no job specific restart strategy has been defined. In case that the job is submitted with a restart strategy, this strategy overrides the cluster’s default setting.

The default restart strategy is set via Flink’s configuration file flink-conf.yaml. The configuration parameter restart-strategy defines which strategy is taken. If checkpointing is not enabled, the “no restart” strategy is used. If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE restart attempts. See the following list of available restart strategies to learn what values are supported.

Each restart strategy comes with its own set of parameters which control its behaviour. These values are also set in the configuration file. The description of each restart strategy contains more information about the respective configuration values.

KeyDefaultTypeDescription
restart-strategy
(none)StringDefines the restart strategy to use in case of job failures.
Accepted values are:
  • none, off, disable: No restart strategy.
  • fixeddelay, fixed-delay: Fixed delay restart strategy. More details can be found here.
  • failurerate, failure-rate: Failure rate restart strategy. More details can be found here.
  • exponentialdelay, exponential-delay: Exponential delay restart strategy. More details can be found here.
If checkpointing is disabled, the default value is none. If checkpointing is enabled, the default value is fixed-delay with Integer.MAX_VALUE restart attempts and ‘1 s‘ delay.

Apart from defining a default restart strategy, it is possible to define for each Flink job a specific restart strategy. This restart strategy is set programmatically by calling the setRestartStrategy method on the StreamExecutionEnvironment.

The following example shows how we can set a fixed delay restart strategy for our job. In case of a failure the system tries to restart the job 3 times and waits 10 seconds in-between successive restart attempts.

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3. 3, // number of restart attempts
  4. Time.of(10, TimeUnit.SECONDS) // delay
  5. ));

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3. 3, // number of restart attempts
  4. Time.of(10, TimeUnit.SECONDS) // delay
  5. ))

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
  3. 3, # number of restart attempts
  4. 10000 # delay(millisecond)
  5. ))

The following sections describe restart strategy specific configuration options.

Fixed Delay Restart Strategy

The fixed delay restart strategy attempts a given number of times to restart the job. If the maximum number of attempts is exceeded, the job eventually fails. In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.

This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml.

  1. restart-strategy: fixed-delay
KeyDefaultTypeDescription
restart-strategy.fixed-delay.attempts
1IntegerThe number of times that Flink retries the execution before the job is declared as failed if restart-strategy has been set to fixed-delay.
restart-strategy.fixed-delay.delay
1 sDurationDelay between two consecutive restart attempts if restart-strategy has been set to fixed-delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. It can be specified using notation: “1 min”, “20 s”

For example:

  1. restart-strategy.fixed-delay.attempts: 3
  2. restart-strategy.fixed-delay.delay: 10 s

The fixed delay restart strategy can also be set programmatically:

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3. 3, // number of restart attempts
  4. Time.of(10, TimeUnit.SECONDS) // delay
  5. ));

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3. 3, // number of restart attempts
  4. Time.of(10, TimeUnit.SECONDS) // delay
  5. ))

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
  3. 3, # number of restart attempts
  4. 10000 # delay(millisecond)
  5. ))

Exponential Delay Restart Strategy

The exponential delay restart strategy attempts to restart the job infinitely, with increasing delay up to the maximum delay. The job never fails. In-between two consecutive restart attempts, the restart strategy keeps exponentially increasing until the maximum number is reached. Then, it keeps the delay at the maximum number.

When the job executes correctly, the exponential delay value resets after some time; this threshold is configurable.

  1. restart-strategy: exponential-delay
KeyDefaultTypeDescription
restart-strategy.exponential-delay.backoff-multiplier
2.0DoubleBackoff value is multiplied by this value after every failure,until max backoff is reached if restart-strategy has been set to exponential-delay.
restart-strategy.exponential-delay.initial-backoff
1 sDurationStarting duration between restarts if restart-strategy has been set to exponential-delay. It can be specified using notation: “1 min”, “20 s”
restart-strategy.exponential-delay.jitter-factor
0.1DoubleJitter specified as a portion of the backoff if restart-strategy has been set to exponential-delay. It represents how large random value will be added or subtracted to the backoff. Useful when you want to avoid restarting multiple jobs at the same time.
restart-strategy.exponential-delay.max-backoff
5 minDurationThe highest possible duration between restarts if restart-strategy has been set to exponential-delay. It can be specified using notation: “1 min”, “20 s”
restart-strategy.exponential-delay.reset-backoff-threshold
1 hDurationThreshold when the backoff is reset to its initial value if restart-strategy has been set to exponential-delay. It specifies how long the job must be running without failure to reset the exponentially increasing backoff to its initial value. It can be specified using notation: “1 min”, “20 s”

For example:

  1. restart-strategy.exponential-delay.initial-backoff: 10 s
  2. restart-strategy.exponential-delay.max-backoff: 2 min
  3. restart-strategy.exponential-delay.backoff-multiplier: 2.0
  4. restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
  5. restart-strategy.exponential-delay.jitter-factor: 0.1

The exponential delay restart strategy can also be set programmatically:

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
  3. Time.milliseconds(1),
  4. Time.milliseconds(1000),
  5. 1.1, // exponential multiplier
  6. Time.milliseconds(2000), // threshold duration to reset delay to its initial value
  7. 0.1 // jitter
  8. ));

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
  3. Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
  4. Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
  5. 1.1, // exponential multiplier
  6. Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
  7. 0.1 // jitter
  8. ))

Python

  1. Still not supported in Python API.

Failure Rate Restart Strategy

The failure rate restart strategy restarts job after failure, but when failure rate (failures per time interval) is exceeded, the job eventually fails. In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.

This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml.

  1. restart-strategy: failure-rate
KeyDefaultTypeDescription
restart-strategy.failure-rate.delay
1 sDurationDelay between two consecutive restart attempts if restart-strategy has been set to failure-rate. It can be specified using notation: “1 min”, “20 s”
restart-strategy.failure-rate.failure-rate-interval
1 minDurationTime interval for measuring failure rate if restart-strategy has been set to failure-rate. It can be specified using notation: “1 min”, “20 s”
restart-strategy.failure-rate.max-failures-per-interval
1IntegerMaximum number of restarts in given time interval before failing a job if restart-strategy has been set to failure-rate.
  1. restart-strategy.failure-rate.max-failures-per-interval: 3
  2. restart-strategy.failure-rate.failure-rate-interval: 5 min
  3. restart-strategy.failure-rate.delay: 10 s

The failure rate restart strategy can also be set programmatically:

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3. 3, // max failures per interval
  4. Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  5. Time.of(10, TimeUnit.SECONDS) // delay
  6. ));

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3. 3, // max failures per unit
  4. Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  5. Time.of(10, TimeUnit.SECONDS) // delay
  6. ))

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. env.set_restart_strategy(RestartStrategies.failure_rate_restart(
  3. 3, # max failures per interval
  4. 300000, # interval for measuring failure rate (millisecond)
  5. 10000 # dela(millisecond)
  6. ))

No Restart Strategy

The job fails directly and no restart is attempted.

  1. restart-strategy: none

The no restart strategy can also be set programmatically:

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setRestartStrategy(RestartStrategies.noRestart());

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setRestartStrategy(RestartStrategies.noRestart())

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. env.set_restart_strategy(RestartStrategies.no_restart())

Fallback Restart Strategy

The cluster defined restart strategy is used. This is helpful for streaming programs which enable checkpointing. By default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.

Failover Strategies

Flink supports different failover strategies which can be configured via the configuration parameter jobmanager.execution.failover-strategy in Flink’s configuration file flink-conf.yaml.

Failover StrategyValue for jobmanager.execution.failover-strategy
Restart allfull
Restart pipelined regionregion

Restart All Failover Strategy

This strategy restarts all tasks in the job to recover from a task failure.

Restart Pipelined Region Failover Strategy

This strategy groups tasks into disjoint regions. When a task failure is detected, this strategy computes the smallest set of regions that must be restarted to recover from the failure. For some jobs this can result in fewer tasks that will be restarted compared to the Restart All Failover Strategy.

A region is a set of tasks that communicate via pipelined data exchanges. That is, batch data exchanges denote the boundaries of a region.

  • All data exchanges in a DataStream job or Streaming Table/SQL job are pipelined.
  • All data exchanges in a Batch Table/SQL job are batched by default.
  • The data exchange types in a DataSet job are determined by the ExecutionMode which can be set through ExecutionConfig.

The regions to restart are decided as below:

  1. The region containing the failed task will be restarted.
  2. If a result partition is not available while it is required by a region that will be restarted, the region producing the result partition will be restarted as well.
  3. If a region is to be restarted, all of its consumer regions will also be restarted. This is to guarantee data consistency because nondeterministic processing or partitioning can result in different partitions.