public void run()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java [438:574]


    public void run(SourceContext<T> sourceContext) throws Exception {

        // initialize metrics
        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14
        final Method getMetricGroupMethod =
                getRuntimeContext().getClass().getMethod("getMetricGroup");
        getMetricGroupMethod.setAccessible(true);
        final MetricGroup metricGroup =
                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());

        metricGroup.gauge(
                "currentFetchEventTimeLag",
                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
        metricGroup.gauge(
                "currentEmitEventTimeLag",
                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
        metricGroup.gauge(
                "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
        MetricOption metricOption = MetricOption.builder()
                .withInlongLabels(inlongMetric)
                .withAuditAddress(inlongAudit)
                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                .withRegisterMetric(RegisteredMetric.ALL)
                .build();
        if (metricOption != null) {
            sourceMetricData = new SourceTableMetricData(metricOption, metricGroup,
                    Arrays.asList(Constants.DATABASE_NAME, Constants.COLLECTION_NAME));
            if (migrateAll) {
                // register sub source metric data from metric state
                sourceMetricData.registerSubMetricsGroup(metricState);
            }
        }
        properties.setProperty("name", "engine");
        properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
        if (restoredOffsetState != null) {
            // restored from state
            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
        }
        // DO NOT include schema change, e.g. DDL
        properties.setProperty("include.schema.changes", "false");
        // disable the offset flush totally
        properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
        // disable tombstones
        properties.setProperty("tombstones.on.delete", "false");
        if (engineInstanceName == null) {
            // not restore from recovery
            engineInstanceName = UUID.randomUUID().toString();
        }
        // history instance name to initialize FlinkDatabaseHistory
        properties.setProperty(
                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
        // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't
        // continue to read binlog
        // see
        // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
        // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
        properties.setProperty("database.history", determineDatabase().getCanonicalName());

        // we have to filter out the heartbeat events, otherwise the deserializer will fail
        String dbzHeartbeatPrefix =
                properties.getProperty(
                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
        this.debeziumChangeFetcher =
                new DebeziumChangeFetcher<>(
                        sourceContext,
                        new DebeziumDeserializationSchema<T>() {

                            @Override
                            public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
                                // do nothing
                            }

                            @Override
                            public void deserialize(SourceRecord record, Collector<T> out, Boolean isStreamingPhase)
                                    throws Exception {
                                if (record != null && MongoRecordUtils.isHeartbeatEvent(record)) {
                                    if (sourceMetricData != null && isStreamingPhase) {
                                        sourceMetricData.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
                                    }
                                    return;
                                }
                                if (sourceMetricData != null && record != null && migrateAll) {
                                    Struct value = (Struct) record.value();
                                    Struct ns = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
                                    if (null == ns) {
                                        ns = value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
                                    }
                                    String dbName = ns.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
                                    String collectionName =
                                            ns.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
                                    Struct source = value.getStruct(Envelope.FieldName.SOURCE);
                                    String snapshotRecord = source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
                                    boolean isSnapshotRecord = Boolean.parseBoolean(snapshotRecord);
                                    sourceMetricData
                                            .outputMetricsWithEstimate(new String[]{dbName, collectionName},
                                                    isSnapshotRecord, value);
                                } else if (sourceMetricData != null && record != null) {
                                    sourceMetricData.outputMetricsWithEstimate(record.value());
                                }
                                deserializer.deserialize(record, out);
                            }

                            @Override
                            public TypeInformation<T> getProducedType() {
                                return deserializer.getProducedType();
                            }
                        },
                        restoredOffsetState == null, // DB snapshot phase if restore state is null
                        dbzHeartbeatPrefix,
                        handover);

        // create the engine with this configuration ...
        this.engine =
                DebeziumEngine.create(Connect.class)
                        .using(properties)
                        .notifying(changeConsumer)
                        .using(OffsetCommitPolicy.always())
                        .using(
                                (success, message, error) -> {
                                    if (success) {
                                        // Close the handover and prepare to exit.
                                        handover.close();
                                    } else {
                                        handover.reportError(error);
                                    }
                                })
                        .build();

        // run the engine asynchronously
        executor.execute(engine);
        debeziumStarted = true;

        // start the real debezium consumer
        debeziumChangeFetcher.runFetchLoop();
    }