in core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java [196:261]
public synchronized List<SourceRecord> poll() {
LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
final long startPollEpochMilli = Instant.now().toEpochMilli();
long remaining = remaining(startPollEpochMilli, maxPollDuration);
long collectedRecords = 0L;
List<SourceRecord> records = new ArrayList<>();
while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
Exchange exchange = consumer.receive(remaining);
if (exchange == null) {
// Nothing received, abort and return what we received so far
break;
}
LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
// TODO: see if there is a better way to use sourcePartition
// an sourceOffset
Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
Object messageBodyValue = exchange.getMessage().getBody();
final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
final long timestamp = calculateTimestamp(exchange);
// take in account Cached camel streams
if (messageBodyValue instanceof StreamCache) {
StreamCache sc = (StreamCache) messageBodyValue;
// reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
sc.reset();
}
for (String singleTopic : topics) {
CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);
if (mapHeaders) {
if (exchange.getMessage().hasHeaders()) {
setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
}
}
if (mapProperties) {
if (exchange.hasProperties()) {
setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
}
}
TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
Integer claimCheck = freeSlots.remove();
camelRecord.setClaimCheck(claimCheck);
exchangesWaitingForAck[claimCheck] = exchange;
LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck);
records.add(camelRecord);
}
collectedRecords++;
remaining = remaining(startPollEpochMilli, maxPollDuration);
}
return records.isEmpty() ? null : records;
}