in core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java [184:217]
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
TaskHelper.logRecordContent(LOG, loggingLevel, record);
Exchange exchange = new DefaultExchange(producer.getCamelContext());
exchange.getMessage().setBody(record.value());
exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());
for (Header header : record.headers()) {
if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
if (mapHeaders) {
mapHeader(header, HEADER_CAMEL_PREFIX, exchange.getMessage().getHeaders());
}
} else if (header.key().startsWith(PROPERTY_CAMEL_PREFIX)) {
if (mapProperties) {
mapHeader(header, PROPERTY_CAMEL_PREFIX, exchange.getProperties());
}
}
}
LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL);
producer.send(localEndpoint, exchange);
if (exchange.isFailed()) {
if (reporter == null) {
LOG.warn("A delivery has failed and the error reporting is NOT enabled. Records may be lost or ignored");
throw new ConnectException("Exchange delivery has failed!", exchange.getException());
}
LOG.warn("A delivery has failed and the error reporting is enabled. Sending record to the DLQ");
reporter.report(record, exchange.getException());
}
}
}