in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java [190:281]
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
parallelismState =
context.getOperatorStateStore()
.getUnionListState(
new ListStateDescriptor<>("parallelism", IntSerializer.INSTANCE));
OperatorStateUtils.getUniqueElement(parallelismState, "parallelism")
.ifPresent(
oldParallelism ->
checkState(
oldParallelism
== getRuntimeContext()
.getNumberOfParallelSubtasks(),
"The head operator is recovered with parallelism changed from "
+ oldParallelism
+ " to "
+ getRuntimeContext()
.getNumberOfParallelSubtasks()));
// Initialize the status and the record processor.
processorContext = new ContextImpl();
statusState =
context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("status", Integer.class));
status =
HeadOperatorStatus.values()[
OperatorStateUtils.getUniqueElement(statusState, "status").orElse(0)];
if (status == HeadOperatorStatus.RUNNING) {
recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
} else {
recordProcessor = new TerminatingHeadOperatorRecordProcessor(processorContext);
}
// Recover the process state if exists.
processorState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"processorState", HeadOperatorState.TYPE_INFO));
OperatorStateUtils.getUniqueElement(processorState, "processorState")
.ifPresent(
headOperatorState ->
recordProcessor.initializeState(
headOperatorState, context.getRawOperatorStateInputs()));
checkpointAligner = new HeadOperatorCheckpointAligner();
// Initialize the checkpoints
Path dataCachePath =
OperatorUtils.getDataCachePath(
getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration(),
getContainingTask()
.getEnvironment()
.getIOManager()
.getSpillingDirectoriesPaths());
this.checkpoints =
new Checkpoints<>(
config.getTypeSerializerOut(getClass().getClassLoader()),
dataCachePath.getFileSystem(),
OperatorUtils.createDataCacheFileGenerator(
dataCachePath, "header-cp", getOperatorConfig().getOperatorID()));
CheckpointsBroker.get()
.setCheckpoints(
OperatorUtils.<IterationRecord<?>>createFeedbackKey(
iterationId, feedbackIndex)
.withSubTaskIndex(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getAttemptNumber()),
checkpoints);
try {
for (StatePartitionStreamProvider rawStateInput : context.getRawOperatorStateInputs()) {
DataCacheSnapshot.replay(
rawStateInput.getStream(),
checkpoints.getTypeSerializer(),
(record) ->
recordProcessor.processFeedbackElement(new StreamRecord<>(record)));
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to replay the records", e);
}
// Here we register a mail
registerFeedbackConsumer(
(Runnable runnable) -> {
if (status != HeadOperatorStatus.TERMINATED) {
mailboxExecutor.execute(runnable::run, "Head feedback");
}
});
}