in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java [112:163]
private static int createEncryptionCtxOffset(final FlatBufferBuilder builder, Optional<EncryptionContext> encryptionCtx) {
if (!encryptionCtx.isPresent()) {
return -1;
}
// Message.addEncryptionCtx(builder, encryptionCtxOffset);
EncryptionContext ctx = encryptionCtx.get();
int[] keysOffsets = new int[ctx.getKeys().size()];
int keyIndex = 0;
for (Entry<String, org.apache.pulsar.common.api.EncryptionContext.EncryptionKey> entry : ctx.getKeys()
.entrySet()) {
int key = builder.createString(entry.getKey());
int value = EncryptionKey.createValueVector(builder, entry.getValue().getKeyValue());
Map<String, String> metadata = entry.getValue().getMetadata();
int[] metadataOffsets = new int[metadata.size()];
int i = 0;
for (Entry<String, String> m : metadata.entrySet()) {
metadataOffsets[i++] = KeyValue.createKeyValue(builder, builder.createString(m.getKey()),
builder.createString(m.getValue()));
}
int metadataOffset = -1;
if (metadata.size() > 0) {
metadataOffset = EncryptionKey.createMetadataVector(builder, metadataOffsets);
}
EncryptionKey.startEncryptionKey(builder);
EncryptionKey.addKey(builder, key);
EncryptionKey.addValue(builder, value);
if(metadataOffset!=-1) {
EncryptionKey.addMetadata(builder, metadataOffset);
}
keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
}
int keysOffset = EncryptionCtx.createKeysVector(builder, keysOffsets);
int param = EncryptionCtx.createParamVector(builder, ctx.getParam());
int algo = builder.createString(ctx.getAlgorithm());
int batchSize = ctx.getBatchSize().isPresent() ? ctx.getBatchSize().get() : 1;
byte compressionType;
switch (ctx.getCompressionType()) {
case LZ4:
compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4;
break;
case ZLIB:
compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.ZLIB;
break;
default:
compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.NONE;
}
return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, algo, compressionType,
ctx.getUncompressedMessageSize(), batchSize, ctx.getBatchSize().isPresent());
}