in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java [553:638]
public boolean commitOffsets() {
long commitTimeoutMs = workerConfig.getOffsetCommitTimeoutMsConfig();
log.debug("{} Committing offsets", this);
long started = System.currentTimeMillis();
long timeout = started + commitTimeoutMs;
RecordOffsetManagement.CommittableOffsets offsetsToCommit;
synchronized (this) {
offsetsToCommit = this.committableOffsets;
this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
}
if (committableOffsets.isEmpty()) {
log.debug("{} Either no records were produced by the task since the last offset commit, "
+ "or every record has been filtered out by a transformation "
+ "or dropped due to transformation or conversion errors.",
this
);
// We continue with the offset commit process here instead of simply returning immediately
// in order to invoke SourceTask::commit and record metrics for a successful offset commit
} else {
log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
if (committableOffsets.hasPending()) {
log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+ "The source partition with the most pending messages is {}, with {} pending messages",
this,
committableOffsets.numUncommittableMessages(),
committableOffsets.numDeques(),
committableOffsets.largestDequePartition(),
committableOffsets.largestDequeSize()
);
} else {
log.debug("{} There are currently no pending messages for this offset commit; "
+ "all messages dispatched to the task's producer since the last commit have been acknowledged",
this
);
}
}
// write offset
offsetsToCommit.offsets().forEach(positionStorageWriter::writeOffset);
// begin flush
if (!positionStorageWriter.beginFlush()) {
// There was nothing in the offsets to process, but we still mark a successful offset commit.
long durationMillis = System.currentTimeMillis() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished offset commitOffsets successfully in {} ms",
this, durationMillis);
commitSourceTask();
return true;
}
Future<Void> flushFuture = positionStorageWriter.doFlush((error, key, result) -> {
if (error != null) {
log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
} else {
log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
}
});
try {
flushFuture.get(Math.max(timeout - System.currentTimeMillis(), 0), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} Flush of offsets interrupted, cancelling", this);
positionStorageWriter.cancelFlush();
recordCommitFailure(System.currentTimeMillis() - started);
return false;
} catch (ExecutionException e) {
log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
positionStorageWriter.cancelFlush();
recordCommitFailure(System.currentTimeMillis() - started);
return false;
} catch (TimeoutException e) {
log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
positionStorageWriter.cancelFlush();
recordCommitFailure(System.currentTimeMillis() - started);
return false;
}
long durationMillis = System.currentTimeMillis() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms",
this, durationMillis);
commitSourceTask();
return true;
}