private static int createEncryptionCtxOffset()

in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java [112:163]


    private static int createEncryptionCtxOffset(final FlatBufferBuilder builder, Optional<EncryptionContext> encryptionCtx) {
        if (!encryptionCtx.isPresent()) {
            return -1;
        }
        // Message.addEncryptionCtx(builder, encryptionCtxOffset);
        EncryptionContext ctx = encryptionCtx.get();
        int[] keysOffsets = new int[ctx.getKeys().size()];
        int keyIndex = 0;
        for (Entry<String, org.apache.pulsar.common.api.EncryptionContext.EncryptionKey> entry : ctx.getKeys()
                .entrySet()) {
            int key = builder.createString(entry.getKey());
            int value = EncryptionKey.createValueVector(builder, entry.getValue().getKeyValue());
            Map<String, String> metadata = entry.getValue().getMetadata();
            int[] metadataOffsets = new int[metadata.size()];
            int i = 0;
            for (Entry<String, String> m : metadata.entrySet()) {
                metadataOffsets[i++] = KeyValue.createKeyValue(builder, builder.createString(m.getKey()),
                        builder.createString(m.getValue()));
            }
            int metadataOffset = -1;
            if (metadata.size() > 0) {
                metadataOffset = EncryptionKey.createMetadataVector(builder, metadataOffsets);
            }
            EncryptionKey.startEncryptionKey(builder);
            EncryptionKey.addKey(builder, key);
            EncryptionKey.addValue(builder, value);
            if(metadataOffset!=-1) {
                EncryptionKey.addMetadata(builder, metadataOffset);
            }
            keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
        }

        int keysOffset = EncryptionCtx.createKeysVector(builder, keysOffsets);
        int param = EncryptionCtx.createParamVector(builder, ctx.getParam());
        int algo = builder.createString(ctx.getAlgorithm());
        int batchSize = ctx.getBatchSize().isPresent() ? ctx.getBatchSize().get() : 1;
        byte compressionType;
        switch (ctx.getCompressionType()) {
        case LZ4:
            compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4;
            break;
        case ZLIB:
            compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.ZLIB;
            break;
        default:
            compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.NONE;

        }
        return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, algo, compressionType,
                ctx.getUncompressedMessageSize(), batchSize, ctx.getBatchSize().isPresent());

    }