flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/coordinator/HeadOperatorCoordinator.java [120:155]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public static class HeadOperatorCoordinatorProvider implements Provider {

        private final OperatorID operatorId;

        private final IterationID iterationId;

        private final int totalHeadParallelism;

        public HeadOperatorCoordinatorProvider(
                OperatorID operatorId, IterationID iterationId, int totalHeadParallelism) {
            this.operatorId = operatorId;
            this.iterationId = iterationId;
            this.totalHeadParallelism = totalHeadParallelism;
        }

        @Override
        public OperatorID getOperatorId() {
            return operatorId;
        }

        @Override
        public OperatorCoordinator create(Context context) {
            SharedProgressAligner sharedProgressAligner =
                    SharedProgressAligner.getOrCreate(
                            iterationId,
                            totalHeadParallelism,
                            context,
                            () ->
                                    Executors.newSingleThreadScheduledExecutor(
                                            runnable -> {
                                                Thread thread = new Thread(runnable);
                                                thread.setName(
                                                        "SharedProgressAligner-" + iterationId);
                                                return thread;
                                            }));
            return new HeadOperatorCoordinator(context, sharedProgressAligner);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/coordinator/HeadOperatorCoordinator.java [121:156]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public static class HeadOperatorCoordinatorProvider implements Provider {

        private final OperatorID operatorId;

        private final IterationID iterationId;

        private final int totalHeadParallelism;

        public HeadOperatorCoordinatorProvider(
                OperatorID operatorId, IterationID iterationId, int totalHeadParallelism) {
            this.operatorId = operatorId;
            this.iterationId = iterationId;
            this.totalHeadParallelism = totalHeadParallelism;
        }

        @Override
        public OperatorID getOperatorId() {
            return operatorId;
        }

        @Override
        public OperatorCoordinator create(Context context) {
            SharedProgressAligner sharedProgressAligner =
                    SharedProgressAligner.getOrCreate(
                            iterationId,
                            totalHeadParallelism,
                            context,
                            () ->
                                    Executors.newSingleThreadScheduledExecutor(
                                            runnable -> {
                                                Thread thread = new Thread(runnable);
                                                thread.setName(
                                                        "SharedProgressAligner-" + iterationId);
                                                return thread;
                                            }));
            return new HeadOperatorCoordinator(context, sharedProgressAligner);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



