public static String serializeRecordToJson()

in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java [172:217]


    public static String serializeRecordToJson(Record<byte[]> record) {
        checkNotNull(record, "record can't be null");

        JsonObject result = new JsonObject();
        result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(record.getValue()));
        if (record.getProperties() != null) {
            JsonObject properties = new JsonObject();
            record.getProperties().entrySet()
                    .forEach(e -> properties.addProperty(e.getKey(), e.getValue()));
            result.add(PROPERTIES_FIELD, properties);
        }

        Optional<EncryptionContext> optEncryptionCtx = (record instanceof RecordWithEncryptionContext)
                ? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
                : Optional.empty();
        if (optEncryptionCtx.isPresent()) {
            EncryptionContext encryptionCtx = optEncryptionCtx.get();
            JsonObject encryptionCtxJson = new JsonObject();
            JsonObject keyBase64Map = new JsonObject();
            JsonObject keyMetadataMap = new JsonObject();
            encryptionCtx.getKeys().entrySet().forEach(entry -> {
                keyBase64Map.addProperty(entry.getKey(), getEncoder().encodeToString(entry.getValue().getKeyValue()));
                Map<String, String> keyMetadata = entry.getValue().getMetadata();
                if (keyMetadata != null && !keyMetadata.isEmpty()) {
                    JsonObject metadata = new JsonObject();
                    entry.getValue().getMetadata().entrySet()
                            .forEach(m -> metadata.addProperty(m.getKey(), m.getValue()));
                    keyMetadataMap.add(entry.getKey(), metadata);
                }
            });
            encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
            encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
            encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
                    getEncoder().encodeToString(encryptionCtx.getParam()));
            encryptionCtxJson.addProperty(ALGO_FIELD, encryptionCtx.getAlgorithm());
            if (encryptionCtx.getCompressionType() != null) {
                encryptionCtxJson.addProperty(COMPRESSION_TYPE_FIELD, encryptionCtx.getCompressionType().name());
                encryptionCtxJson.addProperty(UNCPRESSED_MSG_SIZE_FIELD, encryptionCtx.getUncompressedMessageSize());
            }
            if (encryptionCtx.getBatchSize().isPresent()) {
                encryptionCtxJson.addProperty(BATCH_SIZE_FIELD, encryptionCtx.getBatchSize().get());
            }
            result.add(ENCRYPTION_CTX_FIELD, encryptionCtxJson);
        }
        return result.toString();
    }