in crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java [89:120]
private void submitRequest(MirroredSolrRequest request, String topicName) throws MirroringException {
if (log.isDebugEnabled()) {
log.debug("About to submit a MirroredSolrRequest");
}
final long enqueueStartNanos = System.nanoTime();
// Create Producer record
try {
producer.send(new ProducerRecord<>(topicName, request), (metadata, exception) -> {
if (exception != null) {
log.error("Failed adding update to CrossDC queue! request=" + request.getSolrRequest(), exception);
}
});
long lastSuccessfulEnqueueNanos = System.nanoTime();
// Record time since last successful enqueue as 0
long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
// Update elapsed time
if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
slowSubmitAction(request, elapsedTimeMillis);
}
} catch (Exception e) {
// We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
String message = "Unable to enqueue request " + request + ", configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
" and configured max delivery timeout in ms is " + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
log.error(message, e);
throw new MirroringException(message, e);
}
}