void checkForOffsetUpdates()

in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java [57:95]


    void checkForOffsetUpdates(TopicPartition partition) throws Throwable {
        synchronized (partition) {
            PartitionWork work;
            if ((work = partitionWorkMap.get(partition)) != null) {
                WorkUnit workUnit = work.partitionQueue.peek();
                if (workUnit != null) {
                    boolean allFuturesDone = true;
                    for (Future<?> future : workUnit.workItems) {
                        if (!future.isDone()) {
                            if (log.isTraceEnabled()) {
                                log.trace("Future for update is not done topic={}", partition.topic());
                            }
                            allFuturesDone = false;
                            break;
                        }

                        try {
                            future.get();
                        } catch (InterruptedException e) {
                            log.error("Error updating offset for partition: {}", partition, e);
                            throw e;
                        } catch (ExecutionException e) {
                            log.error("Error updating offset for partition: {}", partition, e);
                            throw e.getCause();
                        }

                        if (log.isTraceEnabled()) {
                            log.trace("Future for update is done topic={}", partition.topic());
                        }
                    }

                    if (allFuturesDone) {
                        work.partitionQueue.poll();
                        updateOffset(partition, workUnit.nextOffset);
                    }
                }
            }
        }
    }