public synchronized List poll()

in core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java [197:262]


    public synchronized List<SourceRecord> poll() {
        LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size());
        final long startPollEpochMilli = Instant.now().toEpochMilli();

        long remaining = remaining(startPollEpochMilli, maxPollDuration);
        long collectedRecords = 0L;

        List<SourceRecord> records = new ArrayList<>();
        while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) {
            Exchange exchange = consumer.receive(remaining);
            if (exchange == null) {
                // Nothing received, abort and return what we received so far
                break;
            }

            LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
                    exchange.getMessage().getMessageId(), exchange.getFromEndpoint());

            // TODO: see if there is a better way to use sourcePartition
            // an sourceOffset
            Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
            Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());

            final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
            Object messageBodyValue = exchange.getMessage().getBody();

            final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
            final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;

            final long timestamp = calculateTimestamp(exchange);

            // take in account Cached camel streams
            if (messageBodyValue instanceof StreamCache) {
                StreamCache sc = (StreamCache) messageBodyValue;
                // reset to be sure that the cache is ready to be used before sending it in the record (could be useful for SMTs)
                sc.reset();
            }
            for (String singleTopic : topics) {
                CamelSourceRecord camelRecord = new CamelSourceRecord(sourcePartition, sourceOffset, singleTopic, null, messageKeySchema,
                        messageHeaderKey, messageBodySchema, messageBodyValue, timestamp);

                if (mapHeaders) {
                    if (exchange.getMessage().hasHeaders()) {
                        setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                    }
                }
                
                if (mapProperties) {
                    if (exchange.hasProperties()) {
                        setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                    }
                }

                TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
                Integer claimCheck = freeSlots.remove();
                camelRecord.setClaimCheck(claimCheck);
                exchangesWaitingForAck[claimCheck] = exchange;
                LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck);
                records.add(camelRecord);
            }
            collectedRecords++;
            remaining = remaining(startPollEpochMilli, maxPollDuration);
        }

        return records.isEmpty() ? null : records;
    }