in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java [3032:3178]
private Response generateResponseWithEntry(Entry entry, PersistentTopic persistentTopic) throws IOException {
checkNotNull(entry);
Position pos = entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
boolean isEncrypted = false;
long totalSize = metadataAndPayload.readableBytes();
BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
Map<String, String> properties = metadata.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (v1, v2) -> v2));
responseBuilder.header("X-Pulsar-PROPERTY", new Gson().toJson(properties));
if (brokerEntryMetadata != null) {
if (brokerEntryMetadata.hasBrokerTimestamp()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp()));
}
if (brokerEntryMetadata.hasIndex()) {
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex());
}
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasDeliverAtTime()) {
responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
responseBuilder.header("X-Pulsar-batch-size", totalSize
- metadata.getSerializedSize());
}
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue());
}
if (metadata.hasNumChunksFromMsg()) {
responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
}
responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0);
if (metadata.hasProducerName()) {
responseBuilder.header("X-Pulsar-producer-name", metadata.getProducerName());
}
if (metadata.hasSequenceId()) {
responseBuilder.header("X-Pulsar-sequence-id", metadata.getSequenceId());
}
if (metadata.hasReplicatedFrom()) {
responseBuilder.header("X-Pulsar-replicated-from", metadata.getReplicatedFrom());
}
for (String replicatedTo : metadata.getReplicateTosList()) {
responseBuilder.header("X-Pulsar-replicated-to", replicatedTo);
}
if (metadata.hasPartitionKey()) {
responseBuilder.header("X-Pulsar-partition-key", metadata.getPartitionKey());
}
if (metadata.hasCompression()) {
responseBuilder.header("X-Pulsar-compression", metadata.getCompression());
}
if (metadata.hasUncompressedSize()) {
responseBuilder.header("X-Pulsar-uncompressed-size", metadata.getUncompressedSize());
}
if (metadata.hasEncryptionAlgo()) {
responseBuilder.header("X-Pulsar-encryption-algo", metadata.getEncryptionAlgo());
}
for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) {
responseBuilder.header("X-Pulsar-Base64-encryption-keys",
Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
isEncrypted = true;
}
if (metadata.hasEncryptionParam()) {
responseBuilder.header("X-Pulsar-Base64-encryption-param",
Base64.getEncoder().encodeToString(metadata.getEncryptionParam()));
}
if (metadata.hasSchemaVersion()) {
responseBuilder.header("X-Pulsar-Base64-schema-version",
Base64.getEncoder().encodeToString(metadata.getSchemaVersion()));
}
if (metadata.hasPartitionKeyB64Encoded()) {
responseBuilder.header("X-Pulsar-partition-key-b64-encoded", metadata.isPartitionKeyB64Encoded());
}
if (metadata.hasOrderingKey()) {
responseBuilder.header("X-Pulsar-Base64-ordering-key",
Base64.getEncoder().encodeToString(metadata.getOrderingKey()));
}
if (metadata.hasMarkerType()) {
responseBuilder.header("X-Pulsar-marker-type", metadata.getMarkerType());
}
if (metadata.hasTxnidLeastBits()) {
responseBuilder.header("X-Pulsar-txnid-least-bits", metadata.getTxnidLeastBits());
}
if (metadata.hasTxnidMostBits()) {
responseBuilder.header("X-Pulsar-txnid-most-bits", metadata.getTxnidMostBits());
}
if (metadata.hasHighestSequenceId()) {
responseBuilder.header("X-Pulsar-highest-sequence-id", metadata.getHighestSequenceId());
}
if (metadata.hasUuid()) {
responseBuilder.header("X-Pulsar-uuid", metadata.getUuid());
}
if (metadata.hasNumChunksFromMsg()) {
responseBuilder.header("X-Pulsar-num-chunks-from-msg", metadata.getNumChunksFromMsg());
}
if (metadata.hasTotalChunkMsgSize()) {
responseBuilder.header("X-Pulsar-total-chunk-msg-size", metadata.getTotalChunkMsgSize());
}
if (metadata.hasChunkId()) {
responseBuilder.header("X-Pulsar-chunk-id", metadata.getChunkId());
}
if (metadata.hasNullPartitionKey()) {
responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits());
boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, entry.getPosition());
responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
}
boolean isTxnUncommitted = (entry.getPosition())
.compareTo(persistentTopic.getMaxReadPosition()) > 0;
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
// Decode if needed
CompressionCodec codec = CompressionCodecProvider
.getCompressionCodec(isEncrypted ? NONE : metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();
StreamingOutput stream = output -> {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
};
return responseBuilder.entity(stream).build();
}