in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java [44:72]
public KinesisRecord(KinesisClientRecord record) {
this.key = Optional.of(record.partitionKey());
// encryption type can (annoyingly) be null, so we default to NONE
EncryptionType encType = EncryptionType.NONE;
if (record.encryptionType() != null) {
encType = record.encryptionType();
}
setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
setProperty(ENCRYPTION_TYPE, encType.toString());
setProperty(PARTITION_KEY, record.partitionKey());
setProperty(SEQUENCE_NUMBER, record.sequenceNumber());
if (encType == EncryptionType.NONE) {
String s = null;
try {
s = decoder.decode(record.data()).toString();
} catch (CharacterCodingException e) {
// Ignore
}
this.value = (s != null) ? s.getBytes() : null;
} else if (encType == EncryptionType.KMS) {
// use the raw encrypted value, let them handle it downstream
// TODO: support decoding KMS data here... should be fairly simple
this.value = record.data().array();
} else {
// Who knows?
this.value = null;
}
}