public void initializeState()

in flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java [190:281]


    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        parallelismState =
                context.getOperatorStateStore()
                        .getUnionListState(
                                new ListStateDescriptor<>("parallelism", IntSerializer.INSTANCE));
        OperatorStateUtils.getUniqueElement(parallelismState, "parallelism")
                .ifPresent(
                        oldParallelism ->
                                checkState(
                                        oldParallelism
                                                == getRuntimeContext()
                                                        .getNumberOfParallelSubtasks(),
                                        "The head operator is recovered with parallelism changed from "
                                                + oldParallelism
                                                + " to "
                                                + getRuntimeContext()
                                                        .getNumberOfParallelSubtasks()));

        // Initialize the status and the record processor.
        processorContext = new ContextImpl();
        statusState =
                context.getOperatorStateStore()
                        .getListState(new ListStateDescriptor<>("status", Integer.class));
        status =
                HeadOperatorStatus.values()[
                        OperatorStateUtils.getUniqueElement(statusState, "status").orElse(0)];
        if (status == HeadOperatorStatus.RUNNING) {
            recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
        } else {
            recordProcessor = new TerminatingHeadOperatorRecordProcessor(processorContext);
        }

        // Recover the process state if exists.
        processorState =
                context.getOperatorStateStore()
                        .getListState(
                                new ListStateDescriptor<>(
                                        "processorState", HeadOperatorState.TYPE_INFO));

        OperatorStateUtils.getUniqueElement(processorState, "processorState")
                .ifPresent(
                        headOperatorState ->
                                recordProcessor.initializeState(
                                        headOperatorState, context.getRawOperatorStateInputs()));

        checkpointAligner = new HeadOperatorCheckpointAligner();

        // Initialize the checkpoints
        Path dataCachePath =
                OperatorUtils.getDataCachePath(
                        getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration(),
                        getContainingTask()
                                .getEnvironment()
                                .getIOManager()
                                .getSpillingDirectoriesPaths());
        this.checkpoints =
                new Checkpoints<>(
                        config.getTypeSerializerOut(getClass().getClassLoader()),
                        dataCachePath.getFileSystem(),
                        OperatorUtils.createDataCacheFileGenerator(
                                dataCachePath, "header-cp", getOperatorConfig().getOperatorID()));
        CheckpointsBroker.get()
                .setCheckpoints(
                        OperatorUtils.<IterationRecord<?>>createFeedbackKey(
                                        iterationId, feedbackIndex)
                                .withSubTaskIndex(
                                        getRuntimeContext().getIndexOfThisSubtask(),
                                        getRuntimeContext().getAttemptNumber()),
                        checkpoints);

        try {
            for (StatePartitionStreamProvider rawStateInput : context.getRawOperatorStateInputs()) {
                DataCacheSnapshot.replay(
                        rawStateInput.getStream(),
                        checkpoints.getTypeSerializer(),
                        (record) ->
                                recordProcessor.processFeedbackElement(new StreamRecord<>(record)));
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Failed to replay the records", e);
        }

        // Here we register a mail
        registerFeedbackConsumer(
                (Runnable runnable) -> {
                    if (status != HeadOperatorStatus.TERMINATED) {
                        mailboxExecutor.execute(runnable::run, "Head feedback");
                    }
                });
    }