in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java [438:574]
public void run(SourceContext<T> sourceContext) throws Exception {
// initialize metrics
// make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
final Method getMetricGroupMethod =
getRuntimeContext().getClass().getMethod("getMetricGroup");
getMetricGroupMethod.setAccessible(true);
final MetricGroup metricGroup =
(MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
metricGroup.gauge(
"currentFetchEventTimeLag",
(Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
metricGroup.gauge(
"currentEmitEventTimeLag",
(Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(inlongAudit)
.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
sourceMetricData = new SourceTableMetricData(metricOption, metricGroup,
Arrays.asList(Constants.DATABASE_NAME, Constants.COLLECTION_NAME));
if (migrateAll) {
// register sub source metric data from metric state
sourceMetricData.registerSubMetricsGroup(metricState);
}
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
if (restoredOffsetState != null) {
// restored from state
properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
}
// DO NOT include schema change, e.g. DDL
properties.setProperty("include.schema.changes", "false");
// disable the offset flush totally
properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
// disable tombstones
properties.setProperty("tombstones.on.delete", "false");
if (engineInstanceName == null) {
// not restore from recovery
engineInstanceName = UUID.randomUUID().toString();
}
// history instance name to initialize FlinkDatabaseHistory
properties.setProperty(
FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
// we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
// continue to read binlog
// see
// https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
// and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
properties.setProperty("database.history", determineDatabase().getCanonicalName());
// we have to filter out the heartbeat events, otherwise the deserializer will fail
String dbzHeartbeatPrefix =
properties.getProperty(
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
this.debeziumChangeFetcher =
new DebeziumChangeFetcher<>(
sourceContext,
new DebeziumDeserializationSchema<T>() {
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
// do nothing
}
@Override
public void deserialize(SourceRecord record, Collector<T> out, Boolean isStreamingPhase)
throws Exception {
if (record != null && MongoRecordUtils.isHeartbeatEvent(record)) {
if (sourceMetricData != null && isStreamingPhase) {
sourceMetricData.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
}
return;
}
if (sourceMetricData != null && record != null && migrateAll) {
Struct value = (Struct) record.value();
Struct ns = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
if (null == ns) {
ns = value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
}
String dbName = ns.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
String collectionName =
ns.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String snapshotRecord = source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
boolean isSnapshotRecord = Boolean.parseBoolean(snapshotRecord);
sourceMetricData
.outputMetricsWithEstimate(new String[]{dbName, collectionName},
isSnapshotRecord, value);
} else if (sourceMetricData != null && record != null) {
sourceMetricData.outputMetricsWithEstimate(record.value());
}
deserializer.deserialize(record, out);
}
@Override
public TypeInformation<T> getProducedType() {
return deserializer.getProducedType();
}
},
restoredOffsetState == null, // DB snapshot phase if restore state is null
dbzHeartbeatPrefix,
handover);
// create the engine with this configuration ...
this.engine =
DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(changeConsumer)
.using(OffsetCommitPolicy.always())
.using(
(success, message, error) -> {
if (success) {
// Close the handover and prepare to exit.
handover.close();
} else {
handover.reportError(error);
}
})
.build();
// run the engine asynchronously
executor.execute(engine);
debeziumStarted = true;
// start the real debezium consumer
debeziumChangeFetcher.runFetchLoop();
}