in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java [57:95]
void checkForOffsetUpdates(TopicPartition partition) throws Throwable {
synchronized (partition) {
PartitionWork work;
if ((work = partitionWorkMap.get(partition)) != null) {
WorkUnit workUnit = work.partitionQueue.peek();
if (workUnit != null) {
boolean allFuturesDone = true;
for (Future<?> future : workUnit.workItems) {
if (!future.isDone()) {
if (log.isTraceEnabled()) {
log.trace("Future for update is not done topic={}", partition.topic());
}
allFuturesDone = false;
break;
}
try {
future.get();
} catch (InterruptedException e) {
log.error("Error updating offset for partition: {}", partition, e);
throw e;
} catch (ExecutionException e) {
log.error("Error updating offset for partition: {}", partition, e);
throw e.getCause();
}
if (log.isTraceEnabled()) {
log.trace("Future for update is done topic={}", partition.topic());
}
}
if (allFuturesDone) {
work.partitionQueue.poll();
updateOffset(partition, workUnit.nextOffset);
}
}
}
}
}