public boolean commitOffsets()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java [553:638]


    public boolean commitOffsets() {
        long commitTimeoutMs = workerConfig.getOffsetCommitTimeoutMsConfig();
        log.debug("{} Committing offsets", this);

        long started = System.currentTimeMillis();
        long timeout = started + commitTimeoutMs;

        RecordOffsetManagement.CommittableOffsets offsetsToCommit;
        synchronized (this) {
            offsetsToCommit = this.committableOffsets;
            this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
        }

        if (committableOffsets.isEmpty()) {
            log.debug("{} Either no records were produced by the task since the last offset commit, "
                    + "or every record has been filtered out by a transformation "
                    + "or dropped due to transformation or conversion errors.",
                this
            );
            // We continue with the offset commit process here instead of simply returning immediately
            // in order to invoke SourceTask::commit and record metrics for a successful offset commit
        } else {
            log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages());
            if (committableOffsets.hasPending()) {
                log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
                        + "The source partition with the most pending messages is {}, with {} pending messages",
                    this,
                    committableOffsets.numUncommittableMessages(),
                    committableOffsets.numDeques(),
                    committableOffsets.largestDequePartition(),
                    committableOffsets.largestDequeSize()
                );
            } else {
                log.debug("{} There are currently no pending messages for this offset commit; "
                        + "all messages dispatched to the task's producer since the last commit have been acknowledged",
                    this
                );
            }
        }

        // write offset
        offsetsToCommit.offsets().forEach(positionStorageWriter::writeOffset);

        // begin flush
        if (!positionStorageWriter.beginFlush()) {
            // There was nothing in the offsets to process, but we still mark a successful offset commit.
            long durationMillis = System.currentTimeMillis() - started;
            recordCommitSuccess(durationMillis);
            log.debug("{} Finished offset commitOffsets successfully in {} ms",
                this, durationMillis);
            commitSourceTask();
            return true;
        }

        Future<Void> flushFuture = positionStorageWriter.doFlush((error, key, result) -> {
            if (error != null) {
                log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
            } else {
                log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
            }
        });
        try {
            flushFuture.get(Math.max(timeout - System.currentTimeMillis(), 0), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.warn("{} Flush of offsets interrupted, cancelling", this);
            positionStorageWriter.cancelFlush();
            recordCommitFailure(System.currentTimeMillis() - started);
            return false;
        } catch (ExecutionException e) {
            log.error("{} Flush of offsets threw an unexpected exception: ", this, e);
            positionStorageWriter.cancelFlush();
            recordCommitFailure(System.currentTimeMillis() - started);
            return false;
        } catch (TimeoutException e) {
            log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
            positionStorageWriter.cancelFlush();
            recordCommitFailure(System.currentTimeMillis() - started);
            return false;
        }
        long durationMillis = System.currentTimeMillis() - started;
        recordCommitSuccess(durationMillis);
        log.debug("{} Finished commitOffsets successfully in {} ms",
            this, durationMillis);
        commitSourceTask();
        return true;
    }