flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/coordinator/HeadOperatorCoordinator.java [90:117]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public void checkpointCoordinator(long l, CompletableFuture<byte[]> completableFuture) {
        sharedProgressAligner.requestCheckpoint(l, context.currentParallelism(), completableFuture);
    }

    public void onAligned(GloballyAlignedEvent globallyAlignedEvent) {
        for (int i = 0; i < context.currentParallelism(); ++i) {
            subtaskGateways[i].sendEvent(globallyAlignedEvent);
        }
    }

    @Override
    public void onCheckpointAligned(CoordinatorCheckpointEvent coordinatorCheckpointEvent) {
        for (int i = 0; i < context.currentParallelism(); ++i) {
            subtaskGateways[i].sendEvent(coordinatorCheckpointEvent);
        }
    }

    @Override
    public void close() {
        sharedProgressAligner.unregisterListener(context.getOperatorId());
    }

    @Override
    public void notifyCheckpointComplete(long l) {}

    @Override
    public void subtaskReset(int i, long l) {}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/coordinator/HeadOperatorCoordinator.java [81:108]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @Override
    public void checkpointCoordinator(long l, CompletableFuture<byte[]> completableFuture) {
        sharedProgressAligner.requestCheckpoint(l, context.currentParallelism(), completableFuture);
    }

    public void onAligned(GloballyAlignedEvent globallyAlignedEvent) {
        for (int i = 0; i < context.currentParallelism(); ++i) {
            subtaskGateways[i].sendEvent(globallyAlignedEvent);
        }
    }

    @Override
    public void onCheckpointAligned(CoordinatorCheckpointEvent coordinatorCheckpointEvent) {
        for (int i = 0; i < context.currentParallelism(); ++i) {
            subtaskGateways[i].sendEvent(coordinatorCheckpointEvent);
        }
    }

    @Override
    public void close() {
        sharedProgressAligner.unregisterListener(context.getOperatorId());
    }

    @Override
    public void notifyCheckpointComplete(long l) {}

    @Override
    public void subtaskReset(int i, long l) {}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



