public void put()

in core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java [184:217]


    public void put(Collection<SinkRecord> sinkRecords) {
        for (SinkRecord record : sinkRecords) {
            TaskHelper.logRecordContent(LOG, loggingLevel, record);

            Exchange exchange = new DefaultExchange(producer.getCamelContext());
            exchange.getMessage().setBody(record.value());
            exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());

            for (Header header : record.headers()) {
                if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
                    if (mapHeaders) {
                        mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
                    }
                } else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
                    if (mapProperties) {
                        mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
                    }
                }
            }

            LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL);
            producer.send(localEndpoint, exchange);

            if (exchange.isFailed()) {
                if (reporter == null) {
                    LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored");
                    throw new ConnectException("Exchange delivery has failed!", exchange.getException());
                }

                LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ");
                reporter.report(record, exchange.getException());
            }
        }
    }