in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [185:244]
public List<SourceRecord> poll() throws InterruptedException {
log.debug("Polling...");
try {
List<ReceivedMessage> response = subscriber.pull().get();
List<SourceRecord> sourceRecords = new ArrayList<>();
log.trace("Received " + response.size() + " messages");
for (ReceivedMessage rm : response) {
PubsubMessage message = rm.getMessage();
String ackId = rm.getAckId();
Map<String, String> messageAttributes = message.getAttributesMap();
String key;
String orderingKey = message.getOrderingKey();
if (ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE.equals(kafkaMessageKeyAttribute)) {
key = orderingKey;
} else {
key = messageAttributes.get(kafkaMessageKeyAttribute);
}
Long timestamp = getLongValue(messageAttributes.get(kafkaMessageTimestampAttribute));
if (timestamp == null) {
timestamp = Timestamps.toMillis(message.getPublishTime());
}
ByteString messageData = message.getData();
byte[] messageBytes = messageData.toByteArray();
boolean hasCustomAttributes = !standardAttributes.containsAll(messageAttributes.keySet())
|| (makeOrderingKeyAttribute && orderingKey != null && !orderingKey.isEmpty());
Map<String, String> ack = Collections.singletonMap(cpsSubscription.toString(), ackId);
SourceRecord record = null;
if (hasCustomAttributes) {
if (useKafkaHeaders) {
record =
createRecordWithHeaders(
messageAttributes, ack, key, orderingKey, messageBytes, timestamp);
} else {
record =
createRecordWithStruct(
messageAttributes, ack, key, orderingKey, messageBytes, timestamp);
}
} else {
record =
new SourceRecord(
null,
ack,
kafkaTopic,
selectPartition(key, messageBytes, orderingKey),
Schema.OPTIONAL_STRING_SCHEMA,
key,
Schema.BYTES_SCHEMA,
messageBytes,
timestamp);
}
sourceRecords.add(record);
}
return sourceRecords;
} catch (Exception e) {
log.info("Error while retrieving records, treating as an empty poll. " + e);
return new ArrayList<>();
}
}