in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java [68:110]
public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder builder, Record<byte[]> record) {
checkNotNull(record, "record-context can't be null");
Optional<EncryptionContext> encryptionCtx = (record instanceof RecordWithEncryptionContext)
? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
: Optional.empty();
Map<String, String> properties = record.getProperties();
int encryptionCtxOffset = -1;
int propertiesOffset = -1;
if (properties != null && !properties.isEmpty()) {
int[] propertiesOffsetArray = new int[properties.size()];
int i = 0;
for (Entry<String, String> property : properties.entrySet()) {
propertiesOffsetArray[i++] = KeyValue.createKeyValue(builder, builder.createString(property.getKey()),
builder.createString(property.getValue()));
}
propertiesOffset = Message.createPropertiesVector(builder, propertiesOffsetArray);
}
if (encryptionCtx.isPresent()) {
encryptionCtxOffset = createEncryptionCtxOffset(builder, encryptionCtx);
}
int payloadOffset = Message.createPayloadVector(builder, record.getValue());
Message.startMessage(builder);
Message.addPayload(builder, payloadOffset);
if (encryptionCtxOffset != -1) {
Message.addEncryptionCtx(builder, encryptionCtxOffset);
}
if (propertiesOffset != -1) {
Message.addProperties(builder, propertiesOffset);
}
int endMessage = Message.endMessage(builder);
builder.finish(endMessage);
ByteBuffer bb = builder.dataBuffer();
// to avoid copying of data, use same byte[] wrapped by ByteBuffer. But, ByteBuffer.array() returns entire array
// so, it requires to read from offset:
// builder.sizedByteArray()=>copies buffer: sizedByteArray(space, bb.capacity() - space)
int space = bb.capacity() - builder.offset();
return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
}