in inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java [555:747]
public static List<PulsarMessageInfo> getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
throws Exception {
HttpHeaders headers = response.getHeaders();
String msgId = headers.getFirst("X-Pulsar-Message-ID");
String brokerEntryTimestamp = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
String brokerEntryIndex = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
PulsarBrokerEntryMetadata brokerEntryMetadata;
if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
brokerEntryMetadata = null;
} else {
brokerEntryMetadata = new PulsarBrokerEntryMetadata();
if (brokerEntryTimestamp != null) {
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
}
if (brokerEntryIndex != null) {
brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
}
}
PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
Map<String, String> properties = Maps.newTreeMap();
Object tmp = headers.getFirst("X-Pulsar-publish-time");
if (tmp != null) {
messageMetadata.setPublishTime(parse(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-event-time");
if (tmp != null) {
messageMetadata.setEventTime(parse(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-deliver-at-time");
if (tmp != null) {
messageMetadata.setDeliverAtTime(parse(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-null-value");
if (tmp != null) {
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-producer-name");
if (tmp != null) {
messageMetadata.setProducerName(tmp.toString());
}
tmp = headers.getFirst("X-Pulsar-sequence-id");
if (tmp != null) {
messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-replicated-from");
if (tmp != null) {
messageMetadata.setReplicatedFrom(tmp.toString());
}
tmp = headers.getFirst("X-Pulsar-partition-key");
if (tmp != null) {
messageMetadata.setPartitionKey(tmp.toString());
}
tmp = headers.getFirst("X-Pulsar-compression");
if (tmp != null) {
messageMetadata.setCompression(tmp.toString());
}
tmp = headers.getFirst("X-Pulsar-uncompressed-size");
if (tmp != null) {
messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-encryption-algo");
if (tmp != null) {
messageMetadata.setEncryptionAlgo(tmp.toString());
}
tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
if (tmp != null) {
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-marker-type");
if (tmp != null) {
messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
if (tmp != null) {
messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
if (tmp != null) {
messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
if (tmp != null) {
messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-uuid");
if (tmp != null) {
messageMetadata.setUuid(tmp.toString());
}
tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
if (tmp != null) {
messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
if (tmp != null) {
messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-chunk-id");
if (tmp != null) {
messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-null-partition-key");
if (tmp != null) {
messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
if (tmp != null) {
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
if (tmp != null) {
messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
if (tmp != null) {
messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
}
tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
if (tmp != null) {
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
}
List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
if (ObjectUtils.isNotEmpty(tmpList)) {
if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
} else {
messageMetadata.getReplicateTos().addAll(tmpList);
}
}
tmp = headers.getFirst("X-Pulsar-batch-size");
if (tmp != null) {
properties.put("X-Pulsar-batch-size", (String) tmp);
}
for (Entry<String, List<String>> entry : headers.entrySet()) {
if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
String keyName = entry.getKey().substring("X-Pulsar-PROPERTY-".length());
properties.put(keyName, (String) ((List) entry.getValue()).get(0));
}
}
tmp = headers.getFirst("X-Pulsar-num-batch-message");
if (tmp != null) {
properties.put("X-Pulsar-num-batch-message", (String) tmp);
}
boolean isEncrypted = false;
tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
if (tmp != null) {
isEncrypted = Boolean.parseBoolean(tmp.toString());
}
if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) {
return getIndividualMsgsFromBatch(topic, msgId, response.getBody(), properties, messageMetadata,
brokerEntryMetadata);
}
PulsarMessageInfo messageInfo = new PulsarMessageInfo();
messageInfo.setTopic(topic);
messageInfo.setMessageId(msgId);
messageInfo.setProperties(messageMetadata.getProperties());
messageInfo.setBody(response.getBody());
messageInfo.setPulsarMessageMetadata(messageMetadata);
if (brokerEntryMetadata != null) {
messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata);
}
return Collections.singletonList(messageInfo);
}