func handleResp()

in pulsaradmin/pkg/admin/subscription.go [244:284]


func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {

	msgID := resp.Header.Get("X-Pulsar-Message-ID")
	ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex())
	if err != nil {
		return nil, err
	}

	// read data
	payload, err := io.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}

	properties := make(map[string]string)
	for k := range resp.Header {
		switch {
		case k == PublishTimeHeader:
			h := resp.Header.Get(k)
			if h != "" {
				properties["publish-time"] = h
			}
		case k == BatchHeader:
			h := resp.Header.Get(k)
			if h != "" {
				properties[BatchHeader] = h
			}
			return getIndividualMsgsFromBatch(topic, ID, payload, properties)
		case k == PropertyHeader:
			propJSON := resp.Header.Get(k)
			if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
				return nil, err
			}
		case strings.Contains(k, PropertyPrefix):
			key := strings.TrimPrefix(k, PropertyPrefix)
			properties[key] = resp.Header.Get(k)
		}
	}

	return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil
}