private Response generateResponseWithEntry()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java [3032:3178]


    private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
        checkNotNull(entry);
        Position pos = entry.getPosition();
        ByteBuf metadataAndPayload = entry.getDataBuffer();
        boolean isEncrypted = false;

        long totalSize = metadataAndPayload.readableBytes();
        BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
        MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

        ResponseBuilder responseBuilder = Response.ok();
        responseBuilder.header("X-Pulsar-Message-ID", pos.toString());

        Map<String, String> properties = metadata.getPropertiesList().stream()
                .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (v1, v2) -> v2));
        responseBuilder.header("X-Pulsar-PROPERTY", new Gson().toJson(properties));

        if (brokerEntryMetadata != null) {
            if (brokerEntryMetadata.hasBrokerTimestamp()) {
                responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
                        DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp()));
            }
            if (brokerEntryMetadata.hasIndex()) {
                responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex());
            }
        }
        if (metadata.hasPublishTime()) {
            responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
        }
        if (metadata.hasEventTime()) {
            responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
        }
        if (metadata.hasDeliverAtTime()) {
            responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime()));
        }
        if (metadata.hasNumMessagesInBatch()) {
            responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
            responseBuilder.header("X-Pulsar-batch-size", totalSize
                    - metadata.getSerializedSize());
        }
        if (metadata.hasNullValue()) {
            responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue());
        }
        if (metadata.hasNumChunksFromMsg()) {
            responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
            responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
        }
        responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0);

        if (metadata.hasProducerName()) {
            responseBuilder.header("X-Pulsar-producer-name", metadata.getProducerName());
        }
        if (metadata.hasSequenceId()) {
            responseBuilder.header("X-Pulsar-sequence-id", metadata.getSequenceId());
        }
        if (metadata.hasReplicatedFrom()) {
            responseBuilder.header("X-Pulsar-replicated-from", metadata.getReplicatedFrom());
        }
        for (String replicatedTo : metadata.getReplicateTosList()) {
            responseBuilder.header("X-Pulsar-replicated-to", replicatedTo);
        }
        if (metadata.hasPartitionKey()) {
            responseBuilder.header("X-Pulsar-partition-key", metadata.getPartitionKey());
        }
        if (metadata.hasCompression()) {
            responseBuilder.header("X-Pulsar-compression", metadata.getCompression());
        }
        if (metadata.hasUncompressedSize()) {
            responseBuilder.header("X-Pulsar-uncompressed-size", metadata.getUncompressedSize());
        }
        if (metadata.hasEncryptionAlgo()) {
            responseBuilder.header("X-Pulsar-encryption-algo", metadata.getEncryptionAlgo());
        }
        for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) {
            responseBuilder.header("X-Pulsar-Base64-encryption-keys",
                    Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
            isEncrypted = true;
        }
        if (metadata.hasEncryptionParam()) {
            responseBuilder.header("X-Pulsar-Base64-encryption-param",
                    Base64.getEncoder().encodeToString(metadata.getEncryptionParam()));
        }
        if (metadata.hasSchemaVersion()) {
            responseBuilder.header("X-Pulsar-Base64-schema-version",
                    Base64.getEncoder().encodeToString(metadata.getSchemaVersion()));
        }
        if (metadata.hasPartitionKeyB64Encoded()) {
            responseBuilder.header("X-Pulsar-partition-key-b64-encoded", metadata.isPartitionKeyB64Encoded());
        }
        if (metadata.hasOrderingKey()) {
            responseBuilder.header("X-Pulsar-Base64-ordering-key",
                    Base64.getEncoder().encodeToString(metadata.getOrderingKey()));
        }
        if (metadata.hasMarkerType()) {
            responseBuilder.header("X-Pulsar-marker-type", metadata.getMarkerType());
        }
        if (metadata.hasTxnidLeastBits()) {
            responseBuilder.header("X-Pulsar-txnid-least-bits", metadata.getTxnidLeastBits());
        }
        if (metadata.hasTxnidMostBits()) {
            responseBuilder.header("X-Pulsar-txnid-most-bits", metadata.getTxnidMostBits());
        }
        if (metadata.hasHighestSequenceId()) {
            responseBuilder.header("X-Pulsar-highest-sequence-id", metadata.getHighestSequenceId());
        }
        if (metadata.hasUuid()) {
            responseBuilder.header("X-Pulsar-uuid", metadata.getUuid());
        }
        if (metadata.hasNumChunksFromMsg()) {
            responseBuilder.header("X-Pulsar-num-chunks-from-msg", metadata.getNumChunksFromMsg());
        }
        if (metadata.hasTotalChunkMsgSize()) {
            responseBuilder.header("X-Pulsar-total-chunk-msg-size", metadata.getTotalChunkMsgSize());
        }
        if (metadata.hasChunkId()) {
            responseBuilder.header("X-Pulsar-chunk-id", metadata.getChunkId());
        }
        if (metadata.hasNullPartitionKey()) {
            responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
        }
        if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
            TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
            boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, entry.getPosition());
            responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
        }
        boolean isTxnUncommitted = (entry.getPosition())
                .compareTo(persistentTopic.getMaxReadPosition()) > 0;
        responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);

        // Decode if needed
        CompressionCodec codec = CompressionCodecProvider
                .getCompressionCodec(isEncrypted ? NONE : metadata.getCompression());
        ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

        // Copy into a heap buffer for output stream compatibility
        ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
                uncompressedPayload.readableBytes());
        data.writeBytes(uncompressedPayload);
        uncompressedPayload.release();

        StreamingOutput stream = output -> {
            output.write(data.array(), data.arrayOffset(), data.readableBytes());
            data.release();
        };

        return responseBuilder.entity(stream).build();
    }