in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java [172:217]
public static String serializeRecordToJson(Record<byte[]> record) {
checkNotNull(record, "record can't be null");
JsonObject result = new JsonObject();
result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(record.getValue()));
if (record.getProperties() != null) {
JsonObject properties = new JsonObject();
record.getProperties().entrySet()
.forEach(e -> properties.addProperty(e.getKey(), e.getValue()));
result.add(PROPERTIES_FIELD, properties);
}
Optional<EncryptionContext> optEncryptionCtx = (record instanceof RecordWithEncryptionContext)
? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
: Optional.empty();
if (optEncryptionCtx.isPresent()) {
EncryptionContext encryptionCtx = optEncryptionCtx.get();
JsonObject encryptionCtxJson = new JsonObject();
JsonObject keyBase64Map = new JsonObject();
JsonObject keyMetadataMap = new JsonObject();
encryptionCtx.getKeys().entrySet().forEach(entry -> {
keyBase64Map.addProperty(entry.getKey(), getEncoder().encodeToString(entry.getValue().getKeyValue()));
Map<String, String> keyMetadata = entry.getValue().getMetadata();
if (keyMetadata != null && !keyMetadata.isEmpty()) {
JsonObject metadata = new JsonObject();
entry.getValue().getMetadata().entrySet()
.forEach(m -> metadata.addProperty(m.getKey(), m.getValue()));
keyMetadataMap.add(entry.getKey(), metadata);
}
});
encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
getEncoder().encodeToString(encryptionCtx.getParam()));
encryptionCtxJson.addProperty(ALGO_FIELD, encryptionCtx.getAlgorithm());
if (encryptionCtx.getCompressionType() != null) {
encryptionCtxJson.addProperty(COMPRESSION_TYPE_FIELD, encryptionCtx.getCompressionType().name());
encryptionCtxJson.addProperty(UNCPRESSED_MSG_SIZE_FIELD, encryptionCtx.getUncompressedMessageSize());
}
if (encryptionCtx.getBatchSize().isPresent()) {
encryptionCtxJson.addProperty(BATCH_SIZE_FIELD, encryptionCtx.getBatchSize().get());
}
result.add(ENCRYPTION_CTX_FIELD, encryptionCtxJson);
}
return result.toString();
}