in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java [297:320]
public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
UpdateRequest finalSolrReqBatch = solrReqBatch;
Future<?> future = executor.submit(() -> {
try {
IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
processResult(lastRecord, result);
} catch (MirroringException e) {
// We don't really know what to do here
log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
throw new RuntimeException(e);
} finally {
executor.submit(() -> {
try {
partitionManager.checkForOffsetUpdates(workUnit.partition);
} catch (Throwable e) {
// already logging in checkForOffsetUpdates
}
});
}
});
workUnit.workItems.add(future);
}