in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java [112:135]
public void write(Record<byte[]> record) throws Exception {
// kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering
if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) {
LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName,
record.getRecordSequence());
throw new IllegalStateException("kinesis queue has publish failure");
}
String partitionedKey = record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least one, and at most 256
ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
partitionedKey, data);
addCallback(addRecordResult,
ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor());
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Published message to kinesis stream {} with size {}", streamName, record.getValue().length);
}
}