in kafka-connector/src/main/java/com/google/pubsublite/kafka/sink/PubSubLiteSinkTask.java [55:92]
public void put(Collection<SinkRecord> collection) {
if (publisher.state() != State.RUNNING) {
if (publisher.state() == State.FAILED) {
throw new IllegalStateException("Publisher has failed.", publisher.failureCause());
} else {
throw new IllegalStateException("Publisher not currently running.");
}
}
for (SinkRecord record : collection) {
Message.Builder message = Message.builder();
if (record.key() != null) {
message.setKey(encodeToBytes(record.keySchema(), record.key()));
}
if (record.value() != null) {
message.setData(encodeToBytes(record.valueSchema(), record.value()));
}
ImmutableListMultimap.Builder<String, ByteString> attributes = ImmutableListMultimap
.builder();
getRecordHeaders(record).forEach(header -> attributes
.put(header.key(), Schemas.encodeToBytes(header.schema(), header.value())));
if (record.topic() != null) {
attributes.put(Constants.KAFKA_TOPIC_HEADER, ByteString.copyFromUtf8(record.topic()));
}
if (record.kafkaPartition() != null) {
attributes.put(Constants.KAFKA_PARTITION_HEADER,
ByteString.copyFromUtf8(record.kafkaPartition().toString()));
attributes.put(Constants.KAFKA_OFFSET_HEADER,
ByteString.copyFromUtf8(Long.toString(record.kafkaOffset())));
}
if (record.timestamp() != null) {
attributes.put(Constants.KAFKA_EVENT_TIME_TYPE_HEADER,
ByteString.copyFromUtf8(record.timestampType().name));
message.setEventTime(Timestamps.fromMillis(record.timestamp()));
}
message.setAttributes(attributes.build());
publisher.publish(message.build());
}
}