public static ByteBuffer serializeRecordToFlatBuffer()

in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java [68:110]


    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder builder, Record<byte[]> record) {
        checkNotNull(record, "record-context can't be null");
        Optional<EncryptionContext> encryptionCtx = (record instanceof RecordWithEncryptionContext)
                ? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
                : Optional.empty();
        Map<String, String> properties = record.getProperties();

        int encryptionCtxOffset = -1;
        int propertiesOffset = -1;

        if (properties != null && !properties.isEmpty()) {
            int[] propertiesOffsetArray = new int[properties.size()];
            int i = 0;
            for (Entry<String, String> property : properties.entrySet()) {
                propertiesOffsetArray[i++] = KeyValue.createKeyValue(builder, builder.createString(property.getKey()),
                        builder.createString(property.getValue()));
            }
            propertiesOffset = Message.createPropertiesVector(builder, propertiesOffsetArray);
        }

        if (encryptionCtx.isPresent()) {
            encryptionCtxOffset = createEncryptionCtxOffset(builder, encryptionCtx);
        }

        int payloadOffset = Message.createPayloadVector(builder, record.getValue());
        Message.startMessage(builder);
        Message.addPayload(builder, payloadOffset);
        if (encryptionCtxOffset != -1) {
            Message.addEncryptionCtx(builder, encryptionCtxOffset);
        }
        if (propertiesOffset != -1) {
            Message.addProperties(builder, propertiesOffset);
        }
        int endMessage = Message.endMessage(builder);
        builder.finish(endMessage);
        ByteBuffer bb = builder.dataBuffer();

        // to avoid copying of data, use same byte[] wrapped by ByteBuffer. But, ByteBuffer.array() returns entire array
        // so, it requires to read from offset:
        // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, bb.capacity() - space)
        int space = bb.capacity() - builder.offset();
        return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
    }