pulsaradmin/pkg/admin/subscription.go (233 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"bytes"
"encoding/binary"
"encoding/json"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Subscriptions is admin interface for subscriptions management
type Subscriptions interface {
// Create a new subscription on a topic
Create(utils.TopicName, string, utils.MessageID) error
// Delete a subscription.
// Delete a persistent subscription from a topic. There should not be any active consumers on the subscription
Delete(utils.TopicName, string) error
// ForceDelete deletes a subscription forcefully
ForceDelete(utils.TopicName, string) error
// List returns the list of subscriptions
List(utils.TopicName) ([]string, error)
// ResetCursorToMessageID resets cursor position on a topic subscription
// @param
// messageID reset subscription to messageId (or previous nearest messageId if given messageId is not valid)
ResetCursorToMessageID(utils.TopicName, string, utils.MessageID) error
// ResetCursorToTimestamp resets cursor position on a topic subscription
// @param
// time reset subscription to position closest to time in ms since epoch
ResetCursorToTimestamp(utils.TopicName, string, int64) error
// ClearBacklog skips all messages on a topic subscription
ClearBacklog(utils.TopicName, string) error
// SkipMessages skips messages on a topic subscription
SkipMessages(utils.TopicName, string, int64) error
// ExpireMessages expires all messages older than given N (expireTimeInSeconds) seconds for a given subscription
ExpireMessages(utils.TopicName, string, int64) error
// ExpireAllMessages expires all messages older than given N (expireTimeInSeconds) seconds for all
// subscriptions of the persistent-topic
ExpireAllMessages(utils.TopicName, int64) error
// PeekMessages peeks messages from a topic subscription
PeekMessages(utils.TopicName, string, int) ([]*utils.Message, error)
// Deprecated: Use GetMessagesByID() instead
GetMessageByID(topic utils.TopicName, ledgerID, entryID int64) (*utils.Message, error)
// GetMessagesByID gets messages by its ledgerID and entryID
GetMessagesByID(topic utils.TopicName, ledgerID, entryID int64) ([]*utils.Message, error)
}
type subscriptions struct {
pulsar *pulsarClient
basePath string
SubPath string
}
// Subscriptions is used to access the subscriptions endpoints
func (c *pulsarClient) Subscriptions() Subscriptions {
return &subscriptions{
pulsar: c,
basePath: "",
SubPath: "subscription",
}
}
func (s *subscriptions) Create(topic utils.TopicName, sName string, messageID utils.MessageID) error {
endpoint := s.pulsar.endpoint(s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(sName))
return s.pulsar.Client.Put(endpoint, messageID)
}
func (s *subscriptions) delete(topic utils.TopicName, subName string, force bool) error {
endpoint := s.pulsar.endpoint(s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(subName))
queryParams := make(map[string]string)
queryParams["force"] = strconv.FormatBool(force)
return s.pulsar.Client.DeleteWithQueryParams(endpoint, queryParams)
}
func (s *subscriptions) Delete(topic utils.TopicName, sName string) error {
return s.delete(topic, sName, false)
}
func (s *subscriptions) ForceDelete(topic utils.TopicName, sName string) error {
return s.delete(topic, sName, true)
}
func (s *subscriptions) List(topic utils.TopicName) ([]string, error) {
endpoint := s.pulsar.endpoint(s.basePath, topic.GetRestPath(), "subscriptions")
var list []string
return list, s.pulsar.Client.Get(endpoint, &list)
}
func (s *subscriptions) ResetCursorToMessageID(topic utils.TopicName, sName string, id utils.MessageID) error {
endpoint := s.pulsar.endpoint(s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(sName), "resetcursor")
return s.pulsar.Client.Post(endpoint, id)
}
func (s *subscriptions) ResetCursorToTimestamp(topic utils.TopicName, sName string, timestamp int64) error {
endpoint := s.pulsar.endpoint(
s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(sName),
"resetcursor", strconv.FormatInt(timestamp, 10))
return s.pulsar.Client.Post(endpoint, nil)
}
func (s *subscriptions) ClearBacklog(topic utils.TopicName, sName string) error {
endpoint := s.pulsar.endpoint(
s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(sName), "skip_all")
return s.pulsar.Client.Post(endpoint, nil)
}
func (s *subscriptions) SkipMessages(topic utils.TopicName, sName string, n int64) error {
endpoint := s.pulsar.endpoint(
s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(sName),
"skip", strconv.FormatInt(n, 10))
return s.pulsar.Client.Post(endpoint, nil)
}
func (s *subscriptions) ExpireMessages(topic utils.TopicName, sName string, expire int64) error {
endpoint := s.pulsar.endpoint(
s.basePath, topic.GetRestPath(), s.SubPath, url.PathEscape(sName),
"expireMessages", strconv.FormatInt(expire, 10))
return s.pulsar.Client.Post(endpoint, nil)
}
func (s *subscriptions) ExpireAllMessages(topic utils.TopicName, expire int64) error {
endpoint := s.pulsar.endpoint(
s.basePath, topic.GetRestPath(), "all_subscription",
"expireMessages", strconv.FormatInt(expire, 10))
return s.pulsar.Client.Post(endpoint, nil)
}
func (s *subscriptions) PeekMessages(topic utils.TopicName, sName string, n int) ([]*utils.Message, error) {
var msgs []*utils.Message
count := 1
for n > 0 {
m, err := s.peekNthMessage(topic, sName, count)
if err != nil {
return nil, err
}
msgs = append(msgs, m...)
n -= len(m)
count++
}
return msgs, nil
}
func (s *subscriptions) peekNthMessage(topic utils.TopicName, sName string, pos int) ([]*utils.Message, error) {
endpoint := s.pulsar.endpoint(s.basePath, topic.GetRestPath(), "subscription", url.PathEscape(sName),
"position", strconv.Itoa(pos))
resp, err := s.pulsar.Client.MakeRequest(http.MethodGet, endpoint)
if err != nil {
return nil, err
}
defer safeRespClose(resp)
return handleResp(topic, resp)
}
func (s *subscriptions) GetMessageByID(topic utils.TopicName, ledgerID, entryID int64) (*utils.Message, error) {
messages, err := s.GetMessagesByID(topic, ledgerID, entryID)
if err != nil {
return nil, err
}
if len(messages) == 0 {
return nil, nil
}
return messages[0], nil
}
func (s *subscriptions) GetMessagesByID(topic utils.TopicName, ledgerID, entryID int64) ([]*utils.Message, error) {
ledgerIDStr := strconv.FormatInt(ledgerID, 10)
entryIDStr := strconv.FormatInt(entryID, 10)
endpoint := s.pulsar.endpoint(s.basePath, topic.GetRestPath(), "ledger", ledgerIDStr, "entry", entryIDStr)
resp, err := s.pulsar.Client.MakeRequest(http.MethodGet, endpoint)
if err != nil {
return nil, err
}
defer safeRespClose(resp)
messages, err := handleResp(topic, resp)
if err != nil {
return nil, err
}
return messages, nil
}
// safeRespClose is used to close a response body
func safeRespClose(resp *http.Response) {
if resp != nil {
// ignore error since it is closing a response body
_ = resp.Body.Close()
}
}
const (
PublishTimeHeader = "X-Pulsar-Publish-Time"
BatchHeader = "X-Pulsar-Num-Batch-Message"
// PropertyPrefix is part of the old protocol for message properties.
PropertyPrefix = "X-Pulsar-Property-"
// PropertyHeader is part of the new protocol introduced in SNIP-279
// https://github.com/apache/pulsar/pull/20627
// The value is a JSON string representing the properties.
PropertyHeader = "X-Pulsar-Property"
)
func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {
msgID := resp.Header.Get("X-Pulsar-Message-ID")
ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex())
if err != nil {
return nil, err
}
// read data
payload, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
properties := make(map[string]string)
for k := range resp.Header {
switch {
case k == PublishTimeHeader:
h := resp.Header.Get(k)
if h != "" {
properties["publish-time"] = h
}
case k == BatchHeader:
h := resp.Header.Get(k)
if h != "" {
properties[BatchHeader] = h
}
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
case k == PropertyHeader:
propJSON := resp.Header.Get(k)
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
return nil, err
}
case strings.Contains(k, PropertyPrefix):
key := strings.TrimPrefix(k, PropertyPrefix)
properties[key] = resp.Header.Get(k)
}
}
return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil
}
func getIndividualMsgsFromBatch(topic utils.TopicName, msgID *utils.MessageID, data []byte,
properties map[string]string) ([]*utils.Message, error) {
batchSize, err := strconv.Atoi(properties[BatchHeader])
if err != nil {
return nil, nil
}
msgs := make([]*utils.Message, 0, batchSize)
// read all messages in batch
buf32 := make([]byte, 4)
rdBuf := bytes.NewReader(data)
for i := 0; i < batchSize; i++ {
msgID.BatchIndex = i
// singleMetaSize
if _, err := io.ReadFull(rdBuf, buf32); err != nil {
return nil, err
}
singleMetaSize := binary.BigEndian.Uint32(buf32)
// singleMeta
singleMetaBuf := make([]byte, singleMetaSize)
if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
return nil, err
}
singleMeta := new(utils.SingleMessageMetadata)
if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
return nil, err
}
if len(singleMeta.Properties) > 0 {
for _, v := range singleMeta.Properties {
k := *v.Key
property := *v.Value
properties[k] = property
}
}
// payload
singlePayload := make([]byte, singleMeta.GetPayloadSize())
if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
return nil, err
}
msgs = append(msgs, &utils.Message{
Topic: topic.String(),
MessageID: *msgID,
Payload: singlePayload,
Properties: properties,
})
}
return msgs, nil
}