public KinesisRecord()

in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecord.java [44:72]


    public KinesisRecord(KinesisClientRecord record) {
        this.key = Optional.of(record.partitionKey());
        // encryption type can (annoyingly) be null, so we default to NONE
        EncryptionType encType = EncryptionType.NONE;
        if (record.encryptionType() != null) {
            encType = record.encryptionType();
        }
        setProperty(ARRIVAL_TIMESTAMP, record.approximateArrivalTimestamp().toString());
        setProperty(ENCRYPTION_TYPE, encType.toString());
        setProperty(PARTITION_KEY, record.partitionKey());
        setProperty(SEQUENCE_NUMBER, record.sequenceNumber());

        if (encType == EncryptionType.NONE) {
            String s = null;
            try {
                s = decoder.decode(record.data()).toString();
            } catch (CharacterCodingException e) {
               // Ignore 
            }
            this.value = (s != null) ? s.getBytes() : null;
        } else if (encType == EncryptionType.KMS) {
            // use the raw encrypted value, let them handle it downstream
            // TODO: support decoding KMS data here... should be fairly simple
            this.value = record.data().array();
        } else {
            // Who knows?
            this.value = null;
        }
    }