private void submitRequest()

in crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java [89:120]


    private void submitRequest(MirroredSolrRequest request, String topicName) throws MirroringException {
        if (log.isDebugEnabled()) {
            log.debug("About to submit a MirroredSolrRequest");
        }

        final long enqueueStartNanos = System.nanoTime();

        // Create Producer record
        try {

            producer.send(new ProducerRecord<>(topicName, request), (metadata, exception) -> {
                if (exception != null) {
                    log.error("Failed adding update to CrossDC queue! request=" + request.getSolrRequest(), exception);
                }
            });

            long lastSuccessfulEnqueueNanos = System.nanoTime();
            // Record time since last successful enqueue as 0
            long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
            // Update elapsed time

            if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
                slowSubmitAction(request, elapsedTimeMillis);
            }
        } catch (Exception e) {
            // We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
            String message = "Unable to enqueue request " + request + ", configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
                    " and configured max delivery timeout in ms is " + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
            log.error(message, e);
            throw new MirroringException(message, e);
        }
    }