in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java [95:229]
public void execute(Context context) throws Exception {
MongoDBFetchTaskContext taskContext = (MongoDBFetchTaskContext) context;
this.sourceConfig = taskContext.getSourceConfig();
ChangeStreamDescriptor descriptor = taskContext.getChangeStreamDescriptor();
ChangeEventQueue<DataChangeEvent> queue = taskContext.getQueue();
MongoClient mongoClient = clientFor(sourceConfig);
MongoChangeStreamCursor<BsonDocument> changeStreamCursor =
openChangeStreamCursor(descriptor);
HeartbeatManager heartbeatManager = openHeartbeatManagerIfNeeded(changeStreamCursor);
final long startPoll = time.milliseconds();
long nextUpdate = startPoll + sourceConfig.getPollAwaitTimeMillis();
this.taskRunning = true;
try {
while (taskRunning) {
Optional<BsonDocument> next;
try {
next = Optional.ofNullable(changeStreamCursor.tryNext());
} catch (MongoCommandException e) {
if (MongoUtils.checkIfChangeStreamCursorExpires(e)) {
LOG.warn("Change stream cursor has expired, trying to recreate cursor");
boolean resumeTokenExpires = MongoUtils.checkIfResumeTokenExpires(e);
if (resumeTokenExpires) {
LOG.warn(
"Resume token has expired, fallback to timestamp restart mode");
}
changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires);
next = Optional.ofNullable(changeStreamCursor.tryNext());
} else {
throw e;
}
}
SourceRecord changeRecord = null;
if (!next.isPresent()) {
long untilNext = nextUpdate - time.milliseconds();
if (untilNext > 0) {
LOG.debug("Waiting {} ms to poll change records", untilNext);
time.sleep(untilNext);
continue;
}
if (heartbeatManager != null) {
changeRecord =
heartbeatManager
.heartbeat()
.map(this::normalizeHeartbeatRecord)
.orElse(null);
}
// update nextUpdateTime
nextUpdate = time.milliseconds() + sourceConfig.getPollAwaitTimeMillis();
} else {
BsonDocument changeStreamDocument = next.get();
OperationType operationType = getOperationType(changeStreamDocument);
switch (operationType) {
case INSERT:
case UPDATE:
case REPLACE:
case DELETE:
MongoNamespace namespace = getMongoNamespace(changeStreamDocument);
BsonDocument resumeToken = changeStreamDocument.getDocument(ID_FIELD);
BsonDocument valueDocument =
normalizeChangeStreamDocument(changeStreamDocument);
LOG.trace("Adding {} to {}", valueDocument, namespace.getFullName());
changeRecord =
MongoRecordUtils.createSourceRecord(
MongoRecordUtils.createPartitionMap(
sourceConfig.getScheme(),
sourceConfig.getHosts(),
namespace.getDatabaseName(),
namespace.getCollectionName()),
MongoRecordUtils.createSourceOffsetMap(
resumeToken, false),
namespace.getFullName(),
changeStreamDocument.getDocument(ID_FIELD),
valueDocument);
break;
default:
// Ignore drop、drop_database、rename and other record to prevent
// documentKey from being empty.
LOG.info("Ignored {} record: {}", operationType, changeStreamDocument);
}
}
if (changeRecord != null && !isBoundedRead()) {
queue.enqueue(new DataChangeEvent(changeRecord));
}
if (isBoundedRead()) {
ChangeStreamOffset currentOffset;
if (changeRecord != null) {
currentOffset =
new ChangeStreamOffset(
MongoRecordUtils.getResumeToken(changeRecord));
// The log after the high watermark won't emit.
if (currentOffset.isAtOrBefore(streamSplit.getEndingOffset())) {
queue.enqueue(new DataChangeEvent(changeRecord));
}
} else {
// Heartbeat is not turned on or there is no update event
currentOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient));
}
// Reach the high watermark, the binlog fetcher should be finished
if (currentOffset.isAtOrAfter(streamSplit.getEndingOffset())) {
// send watermark end event
SourceRecord watermark =
WatermarkEvent.create(
MongoRecordUtils.createWatermarkPartitionMap(
descriptor.toString()),
WATERMARK_TOPIC_NAME,
streamSplit.splitId(),
WatermarkKind.END,
currentOffset);
queue.enqueue(new DataChangeEvent(watermark));
break;
}
}
}
} catch (Exception e) {
LOG.error("Poll change stream records failed ", e);
throw e;
} finally {
taskRunning = false;
if (changeStreamCursor != null) {
changeStreamCursor.close();
}
}
}