in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/MongoDBSource.java [345:464]
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty(
"connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName());
props.setProperty("name", "mongodb_cdc_source");
props.setProperty(
MongoSourceConfig.CONNECTION_URI_CONFIG,
buildConnectionString(username, password, scheme, hosts, connectionOptions));
if (databaseList != null) {
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
}
if (collectionList != null) {
props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
}
if (fullDocumentBeforeChange) {
props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_REQUIRED);
props.setProperty(
MongoSourceConfig.FULL_DOCUMENT_BEFORE_CHANGE_CONFIG,
FULL_DOCUMENT_REQUIRED);
} else if (updateLookup) {
props.setProperty(
MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
}
props.setProperty(
MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
String.valueOf(Boolean.FALSE));
props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_KEY_CONFIG, OUTPUT_FORMAT_SCHEMA);
props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, OUTPUT_FORMAT_SCHEMA);
props.setProperty(
MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
String.valueOf(Boolean.FALSE));
props.setProperty(MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA);
if (batchSize != null) {
props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize));
}
if (pollAwaitTimeMillis != null) {
props.setProperty(
MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG,
String.valueOf(pollAwaitTimeMillis));
}
if (pollMaxBatchSize != null) {
props.setProperty(
MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG,
String.valueOf(pollMaxBatchSize));
}
if (startupOptions != null) {
switch (startupOptions.startupMode) {
case INITIAL:
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing");
break;
case LATEST_OFFSET:
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "latest");
break;
case TIMESTAMP:
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "timestamp");
// mongodb-kafka requires an integer number of seconds since the Epoch
props.setProperty(
STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG,
String.valueOf(startupOptions.startupTimestampMillis / 1000));
break;
}
} else if (copyExisting != null) {
props.setProperty(
MongoSourceConfig.STARTUP_MODE_CONFIG,
copyExisting ? "copy_existing" : "latest");
} else {
// explicitly fallback to initial mode
// since mongodb-kafka's default option is latest
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing");
}
if (initialSnapshottingMaxThreads != null) {
props.setProperty(
STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG,
String.valueOf(initialSnapshottingMaxThreads));
}
if (initialSnapshottingQueueSize != null) {
props.setProperty(
STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG,
String.valueOf(initialSnapshottingQueueSize));
}
if (initialSnapshottingPipeline != null) {
props.setProperty(
STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG,
initialSnapshottingPipeline);
}
if (heartbeatIntervalMillis != null) {
props.setProperty(
MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
String.valueOf(heartbeatIntervalMillis));
}
props.setProperty(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME);
// Let DebeziumChangeFetcher recognize heartbeat record
props.setProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME);
props.setProperty(
MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG, String.valueOf(Boolean.TRUE));
props.setProperty(
MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value());
return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
}