in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/controller/rebalancer/BatchRpcUriRebalancer.java [150:238]
public void postProcess(
Map<String, RebalancingJobGroup> jobGroups, Map<Long, StoredWorker> workers) {
// Dynamic config for turning it off. Although from the comments below we know it's safe to turn
// off, we want to have a way for quick mitigation, if anything happens in production.
if (!dynamicConfiguration.isOffsetCommittingEnabled()) {
return;
}
Stopwatch postProcessStopwatch = scope.timer(MetricNames.POST_PROCESS_LATENCY).start();
try {
long currentTimeMs = System.currentTimeMillis();
for (RebalancingJobGroup rebalancingJobGroup : jobGroups.values()) {
Timestamp endTimestamp =
rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getEndTimestamp();
long endTimestampMs = Timestamps.toMillis(endTimestamp);
// try to keep committing offsets for a certain time period so that it can eventually
// succeed.
// (1) we keep committing for a certain time period because the committed offset might be
// override by another purge/merge operation, which will eventually be deleted.
// (2) we don't keep committing forever because
// (a) messages will be purged after a certain time period, we don't need to do it
// anymore
// (b) there might be too many committing work, which might take unacceptable long time.
if (currentTimeMs - endTimestampMs > OFFSET_COMMIT_SKEW_MS) {
continue;
}
// TODO(qichao): https://t3.uberinternal.com/browse/KAFEP-1263
// The following code for committing the offset is no longer needed due to
// recent fixes in KCP DLQ purge. KCP now uses worker to commit the offset.
String topic = rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getTopic();
Map<TopicPartition, OffsetAndMetadata> partitionAndOffsetToCommit = new HashMap<>();
for (StoredJob job : rebalancingJobGroup.getJobs().values()) {
long startOffset = job.getJob().getKafkaConsumerTask().getStartOffset();
long endOffset = job.getJob().getKafkaConsumerTask().getEndOffset();
if (endOffset > 0
&& startOffset == endOffset
&& job.getState() == JobState.JOB_STATE_CANCELED) {
partitionAndOffsetToCommit.put(
new TopicPartition(topic, job.getJob().getKafkaConsumerTask().getPartition()),
new OffsetAndMetadata(endOffset));
}
}
// commit offsets to kafka clusters
if (!partitionAndOffsetToCommit.isEmpty()) {
try {
Stopwatch stopwatch = scope.timer(MetricNames.OFFSET_COMMIT_LATENCY).start();
KafkaConsumerTaskGroup taskGroup =
rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup();
AdminClient client = adminBuilder.build(taskGroup.getCluster());
client.alterConsumerGroupOffsets(
taskGroup.getConsumerGroup(), partitionAndOffsetToCommit);
stopwatch.stop();
} catch (Exception e) {
// failed to commit offset might lead to wrong dlq lag reports.
// we need to add metrics and alerts.
logger.warn(
MetricNames.OFFSET_COMMIT_FAILURE,
StructuredLogging.kafkaCluster(
rebalancingJobGroup.getJobGroup().getKafkaConsumerTaskGroup().getCluster()),
StructuredLogging.kafkaGroup(
rebalancingJobGroup
.getJobGroup()
.getKafkaConsumerTaskGroup()
.getConsumerGroup()),
StructuredLogging.kafkaTopic(topic),
e);
scope
.tagged(
StructuredTags.builder()
.setKafkaCluster(
rebalancingJobGroup
.getJobGroup()
.getKafkaConsumerTaskGroup()
.getCluster())
.setKafkaGroup(
rebalancingJobGroup
.getJobGroup()
.getKafkaConsumerTaskGroup()
.getConsumerGroup())
.setKafkaTopic(topic)
.build())
.counter(MetricNames.OFFSET_COMMIT_FAILURE)
.inc(1);
}
}
}
} finally {
postProcessStopwatch.stop();
}
}