in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [246:275]
private SourceRecord createRecordWithHeaders(
Map<String, String> messageAttributes,
Map<String, String> ack,
String key,
String orderingKey,
byte[] messageBytes,
Long timestamp) {
ConnectHeaders headers = new ConnectHeaders();
for (Entry<String, String> attribute :
messageAttributes.entrySet()) {
if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
headers.addString(attribute.getKey(), attribute.getValue());
}
}
if (makeOrderingKeyAttribute && orderingKey != null && !orderingKey.isEmpty()) {
headers.addString(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE, orderingKey);
}
return new SourceRecord(
null,
ack,
kafkaTopic,
selectPartition(key, messageBytes, orderingKey),
Schema.OPTIONAL_STRING_SCHEMA,
key,
Schema.BYTES_SCHEMA,
messageBytes,
timestamp,
headers);
}