public void execute()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java [95:229]


    public void execute(Context context) throws Exception {
        MongoDBFetchTaskContext taskContext = (MongoDBFetchTaskContext) context;
        this.sourceConfig = taskContext.getSourceConfig();

        ChangeStreamDescriptor descriptor = taskContext.getChangeStreamDescriptor();
        ChangeEventQueue<DataChangeEvent> queue = taskContext.getQueue();

        MongoClient mongoClient = clientFor(sourceConfig);
        MongoChangeStreamCursor<BsonDocument> changeStreamCursor =
                openChangeStreamCursor(descriptor);
        HeartbeatManager heartbeatManager = openHeartbeatManagerIfNeeded(changeStreamCursor);

        final long startPoll = time.milliseconds();
        long nextUpdate = startPoll + sourceConfig.getPollAwaitTimeMillis();
        this.taskRunning = true;
        try {
            while (taskRunning) {
                Optional<BsonDocument> next;
                try {
                    next = Optional.ofNullable(changeStreamCursor.tryNext());
                } catch (MongoCommandException e) {
                    if (MongoUtils.checkIfChangeStreamCursorExpires(e)) {
                        LOG.warn("Change stream cursor has expired, trying to recreate cursor");
                        boolean resumeTokenExpires = MongoUtils.checkIfResumeTokenExpires(e);
                        if (resumeTokenExpires) {
                            LOG.warn(
                                    "Resume token has expired, fallback to timestamp restart mode");
                        }
                        changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires);
                        next = Optional.ofNullable(changeStreamCursor.tryNext());
                    } else {
                        throw e;
                    }
                }
                SourceRecord changeRecord = null;
                if (!next.isPresent()) {
                    long untilNext = nextUpdate - time.milliseconds();
                    if (untilNext > 0) {
                        LOG.debug("Waiting {} ms to poll change records", untilNext);
                        time.sleep(untilNext);
                        continue;
                    }

                    if (heartbeatManager != null) {
                        changeRecord =
                                heartbeatManager
                                        .heartbeat()
                                        .map(this::normalizeHeartbeatRecord)
                                        .orElse(null);
                    }
                    // update nextUpdateTime
                    nextUpdate = time.milliseconds() + sourceConfig.getPollAwaitTimeMillis();
                } else {
                    BsonDocument changeStreamDocument = next.get();
                    OperationType operationType = getOperationType(changeStreamDocument);

                    switch (operationType) {
                        case INSERT:
                        case UPDATE:
                        case REPLACE:
                        case DELETE:
                            MongoNamespace namespace = getMongoNamespace(changeStreamDocument);

                            BsonDocument resumeToken = changeStreamDocument.getDocument(ID_FIELD);
                            BsonDocument valueDocument =
                                    normalizeChangeStreamDocument(changeStreamDocument);

                            LOG.trace("Adding {} to {}", valueDocument, namespace.getFullName());

                            changeRecord =
                                    MongoRecordUtils.createSourceRecord(
                                            MongoRecordUtils.createPartitionMap(
                                                    sourceConfig.getScheme(),
                                                    sourceConfig.getHosts(),
                                                    namespace.getDatabaseName(),
                                                    namespace.getCollectionName()),
                                            MongoRecordUtils.createSourceOffsetMap(
                                                    resumeToken, false),
                                            namespace.getFullName(),
                                            changeStreamDocument.getDocument(ID_FIELD),
                                            valueDocument);
                            break;
                        default:
                            // Ignore drop、drop_database、rename and other record to prevent
                            // documentKey from being empty.
                            LOG.info("Ignored {} record: {}", operationType, changeStreamDocument);
                    }
                }
                if (changeRecord != null && !isBoundedRead()) {
                    queue.enqueue(new DataChangeEvent(changeRecord));
                }

                if (isBoundedRead()) {
                    ChangeStreamOffset currentOffset;
                    if (changeRecord != null) {
                        currentOffset =
                                new ChangeStreamOffset(
                                        MongoRecordUtils.getResumeToken(changeRecord));
                        // The log after the high watermark won't emit.
                        if (currentOffset.isAtOrBefore(streamSplit.getEndingOffset())) {
                            queue.enqueue(new DataChangeEvent(changeRecord));
                        }

                    } else {
                        // Heartbeat is not turned on or there is no update event
                        currentOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient));
                    }

                    // Reach the high watermark, the binlog fetcher should be finished
                    if (currentOffset.isAtOrAfter(streamSplit.getEndingOffset())) {
                        // send watermark end event
                        SourceRecord watermark =
                                WatermarkEvent.create(
                                        MongoRecordUtils.createWatermarkPartitionMap(
                                                descriptor.toString()),
                                        WATERMARK_TOPIC_NAME,
                                        streamSplit.splitId(),
                                        WatermarkKind.END,
                                        currentOffset);

                        queue.enqueue(new DataChangeEvent(watermark));
                        break;
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Poll change stream records failed ", e);
            throw e;
        } finally {
            taskRunning = false;
            if (changeStreamCursor != null) {
                changeStreamCursor.close();
            }
        }
    }