public static List getMessagesFromHttpResponse()

in inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java [555:747]


    public static List<PulsarMessageInfo> getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
            throws Exception {
        HttpHeaders headers = response.getHeaders();
        String msgId = headers.getFirst("X-Pulsar-Message-ID");
        String brokerEntryTimestamp = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
        String brokerEntryIndex = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
        PulsarBrokerEntryMetadata brokerEntryMetadata;
        if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
            brokerEntryMetadata = null;
        } else {
            brokerEntryMetadata = new PulsarBrokerEntryMetadata();
            if (brokerEntryTimestamp != null) {
                brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
            }
            if (brokerEntryIndex != null) {
                brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
            }
        }

        PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
        Map<String, String> properties = Maps.newTreeMap();

        Object tmp = headers.getFirst("X-Pulsar-publish-time");
        if (tmp != null) {
            messageMetadata.setPublishTime(parse(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-event-time");
        if (tmp != null) {
            messageMetadata.setEventTime(parse(tmp.toString()));
        }
        tmp = headers.getFirst("X-Pulsar-deliver-at-time");
        if (tmp != null) {
            messageMetadata.setDeliverAtTime(parse(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-null-value");
        if (tmp != null) {
            messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-producer-name");
        if (tmp != null) {
            messageMetadata.setProducerName(tmp.toString());
        }

        tmp = headers.getFirst("X-Pulsar-sequence-id");
        if (tmp != null) {
            messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-replicated-from");
        if (tmp != null) {
            messageMetadata.setReplicatedFrom(tmp.toString());
        }

        tmp = headers.getFirst("X-Pulsar-partition-key");
        if (tmp != null) {
            messageMetadata.setPartitionKey(tmp.toString());
        }

        tmp = headers.getFirst("X-Pulsar-compression");
        if (tmp != null) {
            messageMetadata.setCompression(tmp.toString());
        }

        tmp = headers.getFirst("X-Pulsar-uncompressed-size");
        if (tmp != null) {
            messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-encryption-algo");
        if (tmp != null) {
            messageMetadata.setEncryptionAlgo(tmp.toString());
        }

        tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
        if (tmp != null) {
            messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-marker-type");
        if (tmp != null) {
            messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
        if (tmp != null) {
            messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
        if (tmp != null) {
            messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
        if (tmp != null) {
            messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-uuid");
        if (tmp != null) {
            messageMetadata.setUuid(tmp.toString());
        }

        tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
        if (tmp != null) {
            messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
        if (tmp != null) {
            messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-chunk-id");
        if (tmp != null) {
            messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-null-partition-key");
        if (tmp != null) {
            messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
        if (tmp != null) {
            messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
        if (tmp != null) {
            messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
        if (tmp != null) {
            messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
        }

        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
        if (tmp != null) {
            messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
        }

        List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
        if (ObjectUtils.isNotEmpty(tmpList)) {
            if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
                messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
            } else {
                messageMetadata.getReplicateTos().addAll(tmpList);
            }
        }

        tmp = headers.getFirst("X-Pulsar-batch-size");
        if (tmp != null) {
            properties.put("X-Pulsar-batch-size", (String) tmp);
        }

        for (Entry<String, List<String>> entry : headers.entrySet()) {
            if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
                String keyName = entry.getKey().substring("X-Pulsar-PROPERTY-".length());
                properties.put(keyName, (String) ((List) entry.getValue()).get(0));
            }
        }

        tmp = headers.getFirst("X-Pulsar-num-batch-message");
        if (tmp != null) {
            properties.put("X-Pulsar-num-batch-message", (String) tmp);
        }
        boolean isEncrypted = false;
        tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
        if (tmp != null) {
            isEncrypted = Boolean.parseBoolean(tmp.toString());
        }

        if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) {
            return getIndividualMsgsFromBatch(topic, msgId, response.getBody(), properties, messageMetadata,
                    brokerEntryMetadata);
        }

        PulsarMessageInfo messageInfo = new PulsarMessageInfo();
        messageInfo.setTopic(topic);
        messageInfo.setMessageId(msgId);
        messageInfo.setProperties(messageMetadata.getProperties());
        messageInfo.setBody(response.getBody());
        messageInfo.setPulsarMessageMetadata(messageMetadata);
        if (brokerEntryMetadata != null) {
            messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata);
        }
        return Collections.singletonList(messageInfo);
    }