in kafka-connector/src/main/java/com/google/pubsublite/kafka/source/PollerImpl.java [32:64]
List<SourceRecord> poll() {
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(POLL_DURATION);
ImmutableList.Builder<SourceRecord> output = ImmutableList.builder();
records.forEach(consumerRecord -> {
final ConnectHeaders headers = new ConnectHeaders();
for (Header header : consumerRecord.headers()) {
headers.addBytes(header.key(), header.value());
}
boolean keyNullOrEmpty = consumerRecord.key() == null || consumerRecord.key().length == 0;
output.add(new SourceRecord(
ImmutableMap
.of("topic", consumerRecord.topic(), "partition", consumerRecord.partition()),
ImmutableMap.of("offset", consumerRecord.offset()),
kafkaTopic,
// Null partition uses the default kafka partitioner which has key affinity if the key
// is null. Pub/Sub Lite messages with an empty key are treated as if they have no key,
// and are given a null key instead to get this routing behavior.
// https://docs.confluent.io/3.3.0/connect/connect-storage-cloud/kafka-connect-s3/docs/configuration_options.html#partitioner
null,
Schema.OPTIONAL_BYTES_SCHEMA,
keyNullOrEmpty ? null : consumerRecord.key(),
Schema.BYTES_SCHEMA,
consumerRecord.value(),
consumerRecord.timestamp(),
headers
));
});
return output.build();
} catch (TimeoutException | WakeupException e) {
return null;
}
}