in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [168:215]
public void put(Collection<SinkRecord> sinkRecords) {
log.debug("Received " + sinkRecords.size() + " messages to send to CPS.");
for (SinkRecord record : sinkRecords) {
log.trace("Received record: " + record.toString());
Map<String, String> attributes = new HashMap<>();
ByteString value = handleValue(record.valueSchema(), record.value(), attributes);
String key = null;
String partition = record.kafkaPartition().toString();
if (record.key() != null) {
key = record.key().toString();
attributes.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, key);
}
if (includeMetadata) {
attributes.put(ConnectorUtils.KAFKA_TOPIC_ATTRIBUTE, record.topic());
attributes.put(
ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, partition);
attributes.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(record.kafkaOffset()));
if (record.timestamp() != null) {
attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString());
}
}
if (includeHeaders) {
for (Header header : getRecordHeaders(record)) {
attributes.put(header.key(), header.value().toString());
}
}
if (attributes.size() == 0 && value == null) {
log.warn("Message received with no value and no attributes. Not publishing message");
SettableApiFuture<String> nullMessageFuture = SettableApiFuture.<String>create();
nullMessageFuture.set("No message");
addPendingMessageFuture(record.topic(), record.kafkaPartition(), nullMessageFuture);
continue;
}
PubsubMessage.Builder builder = PubsubMessage.newBuilder();
builder.putAllAttributes(attributes);
if (value != null) {
builder.setData(value);
}
if (orderingKeySource == OrderingKeySource.KEY && key != null && !key.isEmpty()) {
builder.setOrderingKey(key);
} else if (orderingKeySource == OrderingKeySource.PARTITION) {
builder.setOrderingKey(partition);
}
PubsubMessage message = builder.build();
publishMessage(record.topic(), record.kafkaPartition(), message);
}
}