public DebeziumSourceFunction build()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/MongoDBSource.java [345:464]


        public DebeziumSourceFunction<T> build() {
            Properties props = new Properties();

            props.setProperty(
                    "connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
            props.setProperty("name", "mongodb_cdc_source");

            props.setProperty(
                    MongoSourceConfig.CONNECTION_URI_CONFIG,
                    buildConnectionString(username, password, scheme, hosts, connectionOptions));

            if (databaseList != null) {
                props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
            }

            if (collectionList != null) {
                props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
            }

            if (fullDocumentBeforeChange) {
                props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_REQUIRED);
                props.setProperty(
                        MongoSourceConfig.FULL_DOCUMENT_BEFORE_CHANGE_CONFIG,
                        FULL_DOCUMENT_REQUIRED);
            } else if (updateLookup) {
                props.setProperty(
                        MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
            }

            props.setProperty(
                    MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
                    String.valueOf(Boolean.FALSE));

            props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_KEY_CONFIG, OUTPUT_FORMAT_SCHEMA);
            props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, OUTPUT_FORMAT_SCHEMA);
            props.setProperty(
                    MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
                    String.valueOf(Boolean.FALSE));
            props.setProperty(MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA);

            if (batchSize != null) {
                props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
            }

            if (pollAwaitTimeMillis != null) {
                props.setProperty(
                        MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG,
                        String.valueOf(pollAwaitTimeMillis));
            }

            if (pollMaxBatchSize != null) {
                props.setProperty(
                        MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG,
                        String.valueOf(pollMaxBatchSize));
            }

            if (startupOptions != null) {
                switch (startupOptions.startupMode) {
                    case INITIAL:
                        props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing");
                        break;
                    case LATEST_OFFSET:
                        props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "latest");
                        break;
                    case TIMESTAMP:
                        props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "timestamp");

                        // mongodb-kafka requires an integer number of seconds since the Epoch
                        props.setProperty(
                                STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG,
                                String.valueOf(startupOptions.startupTimestampMillis / 1000));
                        break;
                }
            } else if (copyExisting != null) {
                props.setProperty(
                        MongoSourceConfig.STARTUP_MODE_CONFIG,
                        copyExisting ? "copy_existing" : "latest");
            } else {
                // explicitly fallback to initial mode
                // since mongodb-kafka's default option is latest
                props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing");
            }

            if (initialSnapshottingMaxThreads != null) {
                props.setProperty(
                        STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG,
                        String.valueOf(initialSnapshottingMaxThreads));
            }

            if (initialSnapshottingQueueSize != null) {
                props.setProperty(
                        STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG,
                        String.valueOf(initialSnapshottingQueueSize));
            }

            if (initialSnapshottingPipeline != null) {
                props.setProperty(
                        STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG,
                        initialSnapshottingPipeline);
            }

            if (heartbeatIntervalMillis != null) {
                props.setProperty(
                        MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
                        String.valueOf(heartbeatIntervalMillis));
            }

            props.setProperty(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME);

            // Let DebeziumChangeFetcher recognize heartbeat record
            props.setProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME);

            props.setProperty(
                    MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG, String.valueOf(Boolean.TRUE));
            props.setProperty(
                    MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value());

            return new DebeziumSourceFunction<>(
                    deserializer, props, null, Validator.getDefaultValidator());
        }