in pulsaradmin/pkg/admin/subscription.go [244:284]
func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {
msgID := resp.Header.Get("X-Pulsar-Message-ID")
ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex())
if err != nil {
return nil, err
}
// read data
payload, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
properties := make(map[string]string)
for k := range resp.Header {
switch {
case k == PublishTimeHeader:
h := resp.Header.Get(k)
if h != "" {
properties["publish-time"] = h
}
case k == BatchHeader:
h := resp.Header.Get(k)
if h != "" {
properties[BatchHeader] = h
}
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
case k == PropertyHeader:
propJSON := resp.Header.Get(k)
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
return nil, err
}
case strings.Contains(k, PropertyPrefix):
key := strings.TrimPrefix(k, PropertyPrefix)
properties[key] = resp.Header.Get(k)
}
}
return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil
}