public void build()

in paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java [134:218]


    public void build(StreamExecutionEnvironment env) throws Exception {
        checkArgument(
                mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME),
                String.format(
                        "mysql-conf [%s] must be specified.", MySqlSourceOptions.TABLE_NAME.key()));

        String tableList =
                mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME)
                        + "\\."
                        + mySqlConfig.get(MySqlSourceOptions.TABLE_NAME);
        MySqlSource<String> source = MySqlActionUtils.buildMySqlSource(mySqlConfig, tableList);

        boolean caseSensitive = catalog.caseSensitive();

        if (!caseSensitive) {
            validateCaseInsensitive();
        }

        MySqlSchema mySqlSchema =
                MySqlActionUtils.getMySqlSchemaList(
                                mySqlConfig, monitorTablePredication(), new ArrayList<>())
                        .stream()
                        .reduce(MySqlSchema::merge)
                        .orElseThrow(
                                () ->
                                        new RuntimeException(
                                                "No table satisfies the given database name and table name"));

        catalog.createDatabase(database, true);

        Identifier identifier = new Identifier(database, table);
        FileStoreTable table;
        List<ComputedColumn> computedColumns =
                buildComputedColumns(computedColumnArgs, mySqlSchema.typeMapping());
        Schema fromMySql =
                MySqlActionUtils.buildPaimonSchema(
                        mySqlSchema,
                        partitionKeys,
                        primaryKeys,
                        computedColumns,
                        tableConfig,
                        caseSensitive);
        try {
            table = (FileStoreTable) catalog.getTable(identifier);
            if (computedColumns.size() > 0) {
                List<String> computedFields =
                        computedColumns.stream()
                                .map(ComputedColumn::columnName)
                                .collect(Collectors.toList());
                List<String> fieldNames = table.schema().fieldNames();
                checkArgument(
                        new HashSet<>(fieldNames).containsAll(computedFields),
                        " Exists Table should contain all computed columns %s, but are %s.",
                        computedFields,
                        fieldNames);
            }
            MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
        } catch (Catalog.TableNotExistException e) {
            catalog.createTable(identifier, fromMySql, false);
            table = (FileStoreTable) catalog.getTable(identifier);
        }

        String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone);
        Boolean convertTinyint1ToBool = mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
        EventParser.Factory<String> parserFactory =
                () ->
                        new MySqlDebeziumJsonEventParser(
                                zoneId, caseSensitive, computedColumns, convertTinyint1ToBool);

        CdcSinkBuilder<String> sinkBuilder =
                new CdcSinkBuilder<String>()
                        .withInput(
                                env.fromSource(
                                        source, WatermarkStrategy.noWatermarks(), "MySQL Source"))
                        .withParserFactory(parserFactory)
                        .withTable(table)
                        .withIdentifier(identifier)
                        .withCatalogLoader(catalogLoader());
        String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }