private void translate()

in flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java [109:225]


    private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) {
        Configuration pipelineDefConfig = pipelineDef.getConfig();
        int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
        SchemaChangeBehavior schemaChangeBehavior =
                pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

        boolean isBatchMode =
                RuntimeExecutionMode.BATCH.equals(
                        pipelineDefConfig.get(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE));
        if (isBatchMode) {
            env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.BATCH);
        } else {
            env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
        }

        // Initialize translators
        DataSourceTranslator sourceTranslator = new DataSourceTranslator();
        TransformTranslator transformTranslator = new TransformTranslator();
        PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
        SchemaOperatorTranslator schemaOperatorTranslator =
                new SchemaOperatorTranslator(
                        schemaChangeBehavior,
                        pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
                        pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
                        pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
        DataSinkTranslator sinkTranslator = new DataSinkTranslator();

        // And required constructors
        OperatorIDGenerator schemaOperatorIDGenerator =
                new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
        DataSource dataSource =
                sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
        DataSink dataSink =
                sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);

        boolean isParallelMetadataSource = dataSource.isParallelMetadataSource();

        // O ---> Source
        DataStream<Event> stream =
                sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism);

        // Source ---> PreTransform
        stream =
                transformTranslator.translatePreTransform(
                        stream,
                        pipelineDef.getTransforms(),
                        pipelineDef.getUdfs(),
                        pipelineDef.getModels(),
                        dataSource.supportedMetadataColumns(),
                        !isParallelMetadataSource && !isBatchMode);

        // PreTransform ---> PostTransform
        stream =
                transformTranslator.translatePostTransform(
                        stream,
                        pipelineDef.getTransforms(),
                        pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
                        pipelineDef.getUdfs(),
                        pipelineDef.getModels(),
                        dataSource.supportedMetadataColumns());

        if (isParallelMetadataSource) {
            // Translate a distributed topology for sources with distributed tables
            // PostTransform -> Partitioning
            DataStream<PartitioningEvent> partitionedStream =
                    partitioningTranslator.translateDistributed(
                            stream,
                            parallelism,
                            parallelism,
                            dataSink.getDataChangeEventHashFunctionProvider(parallelism));

            // Partitioning -> Schema Operator
            stream =
                    schemaOperatorTranslator.translateDistributed(
                            partitionedStream,
                            parallelism,
                            dataSink.getMetadataApplier()
                                    .setAcceptedSchemaEvolutionTypes(
                                            pipelineDef
                                                    .getSink()
                                                    .getIncludedSchemaEvolutionTypes()),
                            pipelineDef.getRoute());

        } else {
            // Translate a regular topology for sources without distributed tables
            // PostTransform ---> Schema Operator
            stream =
                    schemaOperatorTranslator.translateRegular(
                            stream,
                            parallelism,
                            isBatchMode,
                            dataSink.getMetadataApplier()
                                    .setAcceptedSchemaEvolutionTypes(
                                            pipelineDef
                                                    .getSink()
                                                    .getIncludedSchemaEvolutionTypes()),
                            pipelineDef.getRoute());

            // Schema Operator ---(shuffled)---> Partitioning
            stream =
                    partitioningTranslator.translateRegular(
                            stream,
                            parallelism,
                            parallelism,
                            isBatchMode,
                            schemaOperatorIDGenerator.generate(),
                            dataSink.getDataChangeEventHashFunctionProvider(parallelism));
        }

        // Schema Operator -> Sink -> X
        sinkTranslator.translate(
                pipelineDef.getSink(),
                stream,
                dataSink,
                isBatchMode,
                schemaOperatorIDGenerator.generate());
    }