in flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java [109:225]
private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) {
Configuration pipelineDefConfig = pipelineDef.getConfig();
int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
SchemaChangeBehavior schemaChangeBehavior =
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
boolean isBatchMode =
RuntimeExecutionMode.BATCH.equals(
pipelineDefConfig.get(PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE));
if (isBatchMode) {
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
}
// Initialize translators
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
TransformTranslator transformTranslator = new TransformTranslator();
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
// And required constructors
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
DataSource dataSource =
sourceTranslator.createDataSource(pipelineDef.getSource(), pipelineDefConfig, env);
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
boolean isParallelMetadataSource = dataSource.isParallelMetadataSource();
// O ---> Source
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism);
// Source ---> PreTransform
stream =
transformTranslator.translatePreTransform(
stream,
pipelineDef.getTransforms(),
pipelineDef.getUdfs(),
pipelineDef.getModels(),
dataSource.supportedMetadataColumns(),
!isParallelMetadataSource && !isBatchMode);
// PreTransform ---> PostTransform
stream =
transformTranslator.translatePostTransform(
stream,
pipelineDef.getTransforms(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDef.getUdfs(),
pipelineDef.getModels(),
dataSource.supportedMetadataColumns());
if (isParallelMetadataSource) {
// Translate a distributed topology for sources with distributed tables
// PostTransform -> Partitioning
DataStream<PartitioningEvent> partitionedStream =
partitioningTranslator.translateDistributed(
stream,
parallelism,
parallelism,
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
// Partitioning -> Schema Operator
stream =
schemaOperatorTranslator.translateDistributed(
partitionedStream,
parallelism,
dataSink.getMetadataApplier()
.setAcceptedSchemaEvolutionTypes(
pipelineDef
.getSink()
.getIncludedSchemaEvolutionTypes()),
pipelineDef.getRoute());
} else {
// Translate a regular topology for sources without distributed tables
// PostTransform ---> Schema Operator
stream =
schemaOperatorTranslator.translateRegular(
stream,
parallelism,
isBatchMode,
dataSink.getMetadataApplier()
.setAcceptedSchemaEvolutionTypes(
pipelineDef
.getSink()
.getIncludedSchemaEvolutionTypes()),
pipelineDef.getRoute());
// Schema Operator ---(shuffled)---> Partitioning
stream =
partitioningTranslator.translateRegular(
stream,
parallelism,
parallelism,
isBatchMode,
schemaOperatorIDGenerator.generate(),
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
}
// Schema Operator -> Sink -> X
sinkTranslator.translate(
pipelineDef.getSink(),
stream,
dataSink,
isBatchMode,
schemaOperatorIDGenerator.generate());
}