State Processor API
Apache Flink’s State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet API. Due to the interoperability of DataSet and Table API, you can even use relational Table API or SQL queries to analyze and process state data.
For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly. Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application. It is also possible to fix inconsistent state entries. Finally, the State Processor API opens up many ways to evolve a stateful application that was previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started. For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, and so on.
To get started with the state processor api, include the following library in your application.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
Mapping Application State to DataSets
The State Processor API maps the state of a streaming application to one or more data sets that can be processed separately. In order to be able to use the API, you need to understand how this mapping works.
But let us first have a look at what a stateful Flink job looks like. A Flink job is composed of operators; typically one or more source operators, a few operators for the actual processing, and one or more sink operators. Each operator runs in parallel in one or more tasks and can work with different types of state. An operator can have zero, one, or more “operator states” which are organized as lists that are scoped to the operator’s tasks. If the operator is applied on a keyed stream, it can also have zero, one, or more “keyed states” which are scoped to a key that is extracted from each processed record. You can think of keyed state as a distributed key-value map.
The following figure shows the application “MyApp” which consists of three operators called “Src”, “Proc”, and “Snk”. Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2) and Snk is stateless.
A savepoint or checkpoint of MyApp consists of the data of all states, organized in a way that the states of each task can be restored. When processing the data of a savepoint (or checkpoint) with a batch job, we need a mental model that maps the data of the individual tasks’ states into data sets or tables. In fact, we can think of a savepoint as a database. Every operator (identified by its UID) represents a namespace. Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state’s data of all tasks. All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state. The following figure shows how a savepoint of MyApp is mapped to a database.
The figure shows how the values of Src’s operator state are mapped to a table with one column and five rows, one row for each of the list entries across all parallel tasks of Src. Operator state os2 of the operator “Proc” is similarly mapped to an individual table. The keyed states ks1 and ks2 are combined to a single table with three columns, one for the key, one for ks1 and one for ks2. The keyed table holds one row for each distinct key of both keyed states. Since the operator “Snk” does not have any state, its namespace is empty.
Reading State
Reading state begins by specifying the path to a valid savepoint or checkpoint along with the StateBackend
that should be used to restore the data. The compatibility guarantees for restoring state are identical to those when restoring a DataStream
application.
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend());
val bEnv = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
Operator State
Operator state is any non-keyed state in Flink. This includes, but is not limited to, any use of CheckpointedFunction
or BroadcastState
within an application. When reading operator state, users specify the operator uid, the state name, and the type information.
Operator List State
Operator state stored in a CheckpointedFunction
using getListState
can be read using ExistingSavepoint#readListState
. The state name and type information should match those used to define the ListStateDescriptor
that declared this state in the DataStream application.
DataSet<Integer> listState = savepoint.readListState<>(
"my-uid",
"list-state",
Types.INT);
val listState = savepoint.readListState(
"my-uid",
"list-state",
Types.INT)
Operator Union List State
Operator state stored in a CheckpointedFunction
using getUnionListState
can be read using ExistingSavepoint#readUnionState
. The state name and type information should match those used to define the ListStateDescriptor
that declared this state in the DataStream application. The framework will return a single copy of the state, equivalent to restoring a DataStream with parallelism 1.
DataSet<Integer> listState = savepoint.readUnionState<>(
"my-uid",
"union-state",
Types.INT);
val listState = savepoint.readUnionState(
"my-uid",
"union-state",
Types.INT)
Broadcast State
BroadcastState can be read using ExistingSavepoint#readBroadcastState
. The state name and type information should match those used to define the MapStateDescriptor
that declared this state in the DataStream application. The framework will return a single copy of the state, equivalent to restoring a DataStream with parallelism 1.
DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<>(
"my-uid",
"broadcast-state",
Types.INT,
Types.INT);
val broadcastState = savepoint.readBroadcastState(
"my-uid",
"broadcast-state",
Types.INT,
Types.INT)
Using Custom Serializers
Each of the operator state readers support using custom TypeSerializers
if one was used to define the StateDescriptor
that wrote out the state.
DataSet<Integer> listState = savepoint.readListState<>(
"uid",
"list-state",
Types.INT,
new MyCustomIntSerializer());
val listState = savepoint.readListState(
"uid",
"list-state",
Types.INT,
new MyCustomIntSerializer)
Keyed State
Keyed state, or partitioned state, is any state that is partitioned relative to a key. When reading a keyed state, users specify the operator id and a KeyedStateReaderFunction<KeyType, OutputType>
.
The KeyedStateReaderFunction
allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState. This means if an operator contains a stateful process function such as:
public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Integer, Void> {
ValueState<Integer> state;
ListState<Long> updateTimes;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
updateTimes = getRuntimeContext().getListState(updateDescriptor);
}
@Override
public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception {
state.update(value + 1);
updateTimes.add(System.currentTimeMillis());
}
}
class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
var state: ValueState[Int] = _
var updateTimes: ListState[Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int])
state = getRuntimeContext().getState(stateDescriptor)
val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long])
updateTimes = getRuntimeContext().getListState(updateDescriptor)
}
@throws[Exception]
override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int, Void]#Context, out: Collector[Void]): Unit = {
state.update(value + 1)
updateTimes.add(System.currentTimeMillis)
}
}
Then it can read by defining an output type and corresponding KeyedStateReaderFunction
.
DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
public class KeyedState {
public int key;
public int value;
public List<Long> times;
}
public class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedState> {
ValueState<Integer> state;
ListState<Long> updateTimes;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
updateTimes = getRuntimeContext().getListState(updateDescriptor);
}
@Override
public void readKey(
Integer key,
Context ctx,
Collector<KeyedState> out) throws Exception {
KeyedState data = new KeyedState();
data.key = key;
data.value = state.value();
data.times = StreamSupport
.stream(updateTimes.get().spliterator(), false)
.collect(Collectors.toList());
out.collect(data);
}
}
class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] {
var state: ValueState[Int] = _
var updateTimes: ListState[Long] = _
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor("state", createTypeInformation[Int])
state = getRuntimeContext().getState(stateDescriptor)
val updateDescriptor = new ListStateDescriptor("times", createTypeInformation[Long])
updateTimes = getRuntimeContext().getListState(updateDescriptor)
}
override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out: Collector[KeyedState]): Unit = {
val data = KeyedState(key, state.value, updateTimes.get.asScala.toList)
out.collect(data)
}
}
Along with reading registered state values, each key has access to a Context
with metadata such as registered event time and processing time timers.
Note: When using a KeyedStateReaderFunction
, all state descriptors must be registered eagerly inside of open. Any attempt to call a RuntimeContext#get*State
will result in a RuntimeException
.
Window State
The state processor api supports reading state from a window operator. When reading a window state, users specify the operator id, window assigner, and aggregation type.
Additionally, a WindowReaderFunction
can be specified to enrich each read with additional information similar to a WindowFunction
or ProcessWindowFunction
.
Suppose a DataStream application that counts the number of clicks per user per minute.
class Click {
public String userId;
public LocalDateTime time;
}
class ClickCounter implements AggregateFunction<Click, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Click value, Integer accumulator) {
return 1 + accumulator;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
}
DataStream<Click> clicks = . . .
clicks
.keyBy(click -> click.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new ClickCounter())
.uid("click-window")
.addSink(new Sink());
import java.lang.{Integer => JInteger}
case class Click(userId: String, time: LocalDateTime)
class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
override def createAccumulator(): JInteger = 0
override def add(value: Click, accumulator: JInteger): JInteger = 1 + accumulator
override def getResult(accumulator: JInteger): JInteger = accumulator
override def merge(a: JInteger, b: JInteger): JInteger = a + b
}
DataStream[Click] clicks = . . .
clicks
.keyBy(click => click.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new ClickCounter())
.uid("click-window")
.addSink(new Sink())
This state can be read using the code below.
class ClickState {
public String userId;
public int count;
public TimeWindow window;
public Set<Long> triggerTimers;
}
class ClickReader extends WindowReaderFunction<Integer, ClickState, String, TimeWindow> {
@Override
public void readWindow(String key, Context<TimeWindow> context, Iterable<Integer> elements, Collector<ClickState> out) {
ClickState state = new ClickState();
state.userId = key;
state.count = elements.iterator().next();
state.window = context.window();
state.triggerTimers = context.registeredEventTimeTimers();
out.collect(state);
}
}
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new MemoryStateBackend());
savepoint
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate("click-window", new ClickCounter(), new ClickReader(), Types.String, Types.INT, Types.INT)
.print();
import java.lang.{Integer => JInteger, Long => JLong}
import java.util.{Set => JSet}
case class ClickState(userId: String, count: JInteger, window: TimeWindow, triggerTimers: JSet[JLong])
class ClickReader extends WindowReaderFunction[JInteger, ClickState, String, TimeWindow] {
override def readWindow(
key: String,
context: Context[TimeWindow],
elements: Iterable[JInteger],
out: Collector[ClickState]): Unit = {
state = ClickState(
userId = key,
count = elements.iterator().next(),
window = context.window()k
triggerTimers = context.registeredEventTimeTimers())
out.collect(state)
}
}
val batchEnv = ExecutionEnvironment.getExecutionEnvironment()
val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new MemoryStateBackend())
savepoint
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate("click-window", new ClickCounter(), new ClickReader(), Types.String, Types.INT, Types.INT)
.print()
Additionally, trigger state - from CountTrigger
s or custom triggers - can be read using the method Context#triggerState
inside the WindowReaderFunction
.
Writing New Savepoints
Savepoint
’s may also be written, which allows such use cases as bootstrapping state based on historical data. Each savepoint is made up of one or more BootstrapTransformation
’s (explained below), each of which defines the state for an individual operator.
int maxParallelism = 128;
Savepoint
.create(new MemoryStateBackend(), maxParallelism)
.withOperator("uid1", transformation1)
.withOperator("uid2", transformation2)
.write(savepointPath);
val maxParallelism = 128
Savepoint
.create(new MemoryStateBackend(), maxParallelism)
.withOperator("uid1", transformation1)
.withOperator("uid2", transformation2)
.write(savepointPath)
The UIDs associated with each operator must match one to one with the UIDs assigned to the operators in your DataStream
application; these are how Flink knows what state maps to which operator.
Operator State
Simple operator state, using CheckpointedFunction
, can be created using the StateBootstrapFunction
.
public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
private ListState<Integer> state;
@Override
public void processElement(Integer value, Context ctx) throws Exception {
state.add(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorState().getListState(new ListStateDescriptor<>("state", Types.INT));
}
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data = env.fromElements(1, 2, 3);
BootstrapTransformation transformation = OperatorTransformation
.bootstrapWith(data)
.transform(new SimpleBootstrapFunction());
class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
var ListState[Integer] state = _
@throws[Exception]
override def processElement(value: Integer, ctx: Context): Unit = {
state.add(value)
}
@throws[Exception]
override def snapshotState(context: FunctionSnapshotContext): Unit = {
}
@throws[Exception]
override def initializeState(context: FunctionInitializationContext): Unit = {
state = context.getOperatorState().getListState(new ListStateDescriptor("state", Types.INT))
}
}
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements(1, 2, 3)
BootstrapTransformation transformation = OperatorTransformation
.bootstrapWith(data)
.transform(new SimpleBootstrapFunction)
Broadcast State
BroadcastState can be written using a BroadcastStateBootstrapFunction
. Similar to broadcast state in the DataStream
API, the full state must fit in memory.
public class CurrencyRate {
public String currency;
public Double rate;
}
public class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<CurrencyRate> {
public static final MapStateDescriptor<String, Double> descriptor =
new MapStateDescriptor<>("currency-rates", Types.STRING, Types.DOUBLE);
@Override
public void processElement(CurrencyRate value, Context ctx) throws Exception {
ctx.getBroadcastState(descriptor).put(value.currency, value.rate);
}
}
DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection(
new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation
.bootstrapWith(currencyDataSet)
.transform(new CurrencyBootstrapFunction());
case class CurrencyRate(currency: String, rate: Double)
object CurrencyBootstrapFunction {
val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, Types.DOUBLE)
}
class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction[CurrencyRate] {
@throws[Exception]
override processElement(value: CurrencyRate, ctx: Context): Unit = {
ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
}
}
val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), CurrencyRate("EUR", 1.3))
val broadcastTransformation = OperatorTransformation
.bootstrapWith(currencyDataSet)
.transform(new CurrencyBootstrapFunction)
Keyed State
Keyed state for ProcessFunction
’s and other RichFunction
types can be written using a KeyedStateBootstrapFunction
.
public class Account {
public int id;
public double amount;
public long timestamp;
}
public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
ValueState<Double> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total",Types.DOUBLE);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Account value, Context ctx) throws Exception {
state.update(value.amount);
}
}
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
BootstrapTransformation<Account> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper());
case class Account(id: Int, amount: Double, timestamp: Long)
class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] {
var state: ValueState[Double]
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
state = getRuntimeContext().getState(descriptor)
}
@throws[Exception]
override def processElement(value: Account, ctx: Context): Unit = {
state.update(value.amount)
}
}
val bEnv = ExecutionEnvironment.getExecutionEnvironment()
val accountDataSet = bEnv.fromCollection(accounts)
val transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc => acc.id)
.transform(new AccountBootstrapper)
The KeyedStateBootstrapFunction
supports setting event time and processing time timers. The timers will not fire inside the bootstrap function and only become active once restored within a DataStream
application. If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediately upon start.
Attention If your bootstrap function creates timers, the state can only be restored using one of the process type functions.
Window State
The state processor api supports writing state for the window operator. When writing window state, users specify the operator id, window assigner, evictor, optional trigger, and aggregation type. It is important the configurations on the bootstrap transformation match the configurations on the DataStream window.
public class Account {
public int id;
public double amount;
public long timestamp;
}
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
BootstrapTransformation<Account> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
// When using event time windows, it is important
// to assign timestamps to each record.
.assignTimestamps(account -> account.timestamp)
.keyBy(acc -> acc.id)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((left, right) -> left + right);
case class Account(id: Int, amount: Double, timestamp: Long)
val bEnv = ExecutionEnvironment.getExecutionEnvironment();
val accountDataSet = bEnv.fromCollection(accounts);
val transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
// When using event time windows, its important
// to assign timestamps to each record.
.assignTimestamps(account => account.timestamp)
.keyBy(acc => acc.id)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((left, right) => left + right)
Modifying Savepoints
Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job.
Savepoint
.load(bEnv, new MemoryStateBackend(), oldPath)
.withOperator("uid", transformation)
.write(newPath);
Savepoint
.load(bEnv, new MemoryStateBackend, oldPath)
.withOperator("uid", transformation)
.write(newPath)