pulsaradmin/pkg/admin/topic.go (540 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"fmt"
"strconv"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Topics is admin interface for topics management
type Topics interface {
// Create a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
Create(topic utils.TopicName, partitions int) error
// CreateWithProperties Create a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
// @param meta
// topic properties
CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error
// GetProperties returns the properties of a topic
GetProperties(topic utils.TopicName) (map[string]string, error)
// Delete a topic, this function can delete both partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param force
// delete topic forcefully
// @param nonPartitioned
// when set to true, topic will be treated as a non-partitioned topic
// Otherwise it will be treated as a partitioned topic
Delete(topic utils.TopicName, force bool, nonPartitioned bool) error
// Update number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
//
// @param topic
// topicName struct
// @param partitions
// number of new partitions of already exist partitioned-topic
Update(topic utils.TopicName, partitions int) error
// GetMetadata returns metadata of a partitioned topic
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
// List returns the list of topics under a namespace
List(utils.NameSpaceName) ([]string, []string, error)
// GetInternalInfo returns the internal metadata info for the topic
GetInternalInfo(utils.TopicName) (utils.ManagedLedgerInfo, error)
// GetPermissions returns permissions on a topic
// Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the
// namespace level combined (union) with any eventual specific permission set on the topic.
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)
// GrantPermission grants a new permission to a client role on a single topic
//
// @param topic
// topicName struct
// @param role
// client role to which grant permission
// @param action
// auth actions (e.g. produce and consume)
GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error
// RevokePermission revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
//
// @param topic
// topicName struct
// @param role
// client role to which remove permissions
RevokePermission(topic utils.TopicName, role string) error
// Lookup a topic returns the broker URL that serves the topic
Lookup(utils.TopicName) (utils.LookupData, error)
// GetBundleRange returns a bundle range of a topic
GetBundleRange(utils.TopicName) (string, error)
// GetLastMessageID returns the last commit message Id of a topic
GetLastMessageID(utils.TopicName) (utils.MessageID, error)
// GetMessageID returns the message Id by timestamp(ms) of a topic
//
// @param topic
// topicName struct
// @param timestamp
// absolute timestamp (in ms)
GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error)
// GetStats returns the stats for the topic.
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
GetStats(utils.TopicName) (utils.TopicStats, error)
// GetStatsWithOption returns the stats for the topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error)
// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error)
// GetPartitionedStats returns the stats for the partitioned topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error)
// GetPartitionedStatsWithOption returns the stats for the partitioned topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetPartitionedStatsWithOption(
topic utils.TopicName,
perPartition bool,
option utils.GetStatsOptions,
) (utils.PartitionedTopicStats, error)
// Terminate the topic and prevent any more messages being published on it
Terminate(utils.TopicName) (utils.MessageID, error)
// Offload triggers offloading messages in topic to longterm storage
Offload(utils.TopicName, utils.MessageID) error
// OffloadStatus checks the status of an ongoing offloading operation for a topic
OffloadStatus(utils.TopicName) (utils.OffloadProcessStatus, error)
// Unload a topic
Unload(utils.TopicName) error
// Compact triggers compaction to run for a topic. A single topic can only have one instance of compaction
// running at any time. Any attempt to trigger another will be met with a ConflictException.
Compact(utils.TopicName) error
// CompactStatus checks the status of an ongoing compaction for a topic
CompactStatus(utils.TopicName) (utils.LongRunningProcessStatus, error)
// GetMessageTTL Get the message TTL for a topic
GetMessageTTL(utils.TopicName) (int, error)
// SetMessageTTL Set the message TTL for a topic
//
// @param topic
// topicName struct
// @param messageTTL
// Message TTL in second
SetMessageTTL(topic utils.TopicName, messageTTL int) error
// RemoveMessageTTL Remove the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
// GetMaxProducers Get max number of producers for a topic
GetMaxProducers(utils.TopicName) (int, error)
// SetMaxProducers Set max number of producers for a topic
//
// @param topic
// topicName struct
// @param maxProducers
// max number of producer
SetMaxProducers(topic utils.TopicName, maxProducers int) error
// RemoveMaxProducers Remove max number of producers for a topic
RemoveMaxProducers(utils.TopicName) error
// GetMaxConsumers Get max number of consumers for a topic
GetMaxConsumers(utils.TopicName) (int, error)
// SetMaxConsumers Set max number of consumers for a topic
//
// @param topic
// topicName struct
// @param maxConsumers
// max number of consumer
SetMaxConsumers(topic utils.TopicName, maxConsumers int) error
// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error
// GetMaxUnackMessagesPerConsumer Get max unacked messages policy on consumer for a topic
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on each consumer
SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error
// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
// GetMaxUnackMessagesPerSubscription Get max unacked messages policy on subscription for a topic
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
// SetMaxUnackMessagesPerSubscription Set max unacked messages policy on subscription for a topic
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on subscription of a topic
SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error
// RemoveMaxUnackMessagesPerSubscription Remove max unacked messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
// GetPersistence Get the persistence policies for a topic
GetPersistence(utils.TopicName) (*utils.PersistenceData, error)
// SetPersistence Set the persistence policies for a topic
SetPersistence(utils.TopicName, utils.PersistenceData) error
// RemovePersistence Remove the persistence policies for a topic
RemovePersistence(utils.TopicName) error
// GetDelayedDelivery Get the delayed delivery policy for a topic
GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error)
// SetDelayedDelivery Set the delayed delivery policy on a topic
SetDelayedDelivery(utils.TopicName, utils.DelayedDeliveryData) error
// RemoveDelayedDelivery Remove the delayed delivery policy on a topic
RemoveDelayedDelivery(utils.TopicName) error
// GetDispatchRate Get message dispatch rate for a topic
GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
// SetDispatchRate Set message dispatch rate for a topic
SetDispatchRate(utils.TopicName, utils.DispatchRateData) error
// RemoveDispatchRate Remove message dispatch rate for a topic
RemoveDispatchRate(utils.TopicName) error
// GetPublishRate Get message publish rate for a topic
GetPublishRate(utils.TopicName) (*utils.PublishRateData, error)
// SetPublishRate Set message publish rate for a topic
SetPublishRate(utils.TopicName, utils.PublishRateData) error
// RemovePublishRate Remove message publish rate for a topic
RemovePublishRate(utils.TopicName) error
// GetDeduplicationStatus Get the deduplication policy for a topic
GetDeduplicationStatus(utils.TopicName) (bool, error)
// SetDeduplicationStatus Set the deduplication policy for a topic
//
// @param topic
// topicName struct
// @param enabled
// set enable or disable deduplication of the topic
SetDeduplicationStatus(topic utils.TopicName, enabled bool) error
// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error
// GetRetention returns the retention configuration for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)
// RemoveRetention removes the retention configuration on a topic
RemoveRetention(utils.TopicName) error
// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error
// GetCompactionThreshold Get the compaction threshold for a topic.
//
// i.e. The maximum number of bytes can have before compaction is triggered.
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)
// SetCompactionThreshold Set the compaction threshold for a topic
//
// @param topic
// topicName struct
// @param threshold
// maximum number of backlog bytes before compaction is triggered
SetCompactionThreshold(topic utils.TopicName, threshold int64) error
// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error
// GetBacklogQuotaMap returns backlog quota map for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)
// SetBacklogQuota sets a backlog quota for a topic
SetBacklogQuota(utils.TopicName, utils.BacklogQuota, utils.BacklogQuotaType) error
// RemoveBacklogQuota removes a backlog quota policy from a topic
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error
// GetInactiveTopicPolicies gets the inactive topic policies on a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)
// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
RemoveInactiveTopicPolicies(utils.TopicName) error
// SetInactiveTopicPolicies sets the inactive topic policies on a topic
SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error
// GetReplicationClusters get the replication clusters of a topic
GetReplicationClusters(topic utils.TopicName) ([]string, error)
// SetReplicationClusters sets the replication clusters on a topic
//
// @param topic
// topicName struct
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
}
type topics struct {
pulsar *pulsarClient
basePath string
persistentPath string
nonPersistentPath string
lookupPath string
}
// Check whether the topics struct implements the Topics interface.
var _ Topics = &topics{}
// Topics is used to access the topics endpoints
func (c *pulsarClient) Topics() Topics {
return &topics{
pulsar: c,
basePath: "",
persistentPath: "/persistent",
nonPersistentPath: "/non-persistent",
lookupPath: "/lookup/v2/topic",
}
}
func (t *topics) Create(topic utils.TopicName, partitions int) error {
return t.CreateWithProperties(topic, partitions, nil)
}
func (t *topics) CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
if partitions == 0 {
endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath())
return t.pulsar.Client.Put(endpoint, meta)
}
data := struct {
Meta map[string]string `json:"properties"`
Partitions int `json:"partitions"`
}{
Meta: meta,
Partitions: partitions,
}
return t.pulsar.Client.PutWithCustomMediaType(endpoint, &data, nil, nil, rest.PartitionedTopicMetaJSON)
}
func (t *topics) GetProperties(topic utils.TopicName) (map[string]string, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
var properties map[string]string
err := t.pulsar.Client.Get(endpoint, &properties)
return properties, err
}
func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
if nonPartitioned {
endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath())
}
params := map[string]string{
"force": strconv.FormatBool(force),
}
return t.pulsar.Client.DeleteWithQueryParams(endpoint, params)
}
func (t *topics) Update(topic utils.TopicName, partitions int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
return t.pulsar.Client.Post(endpoint, partitions)
}
func (t *topics) GetMetadata(topic utils.TopicName) (utils.PartitionedTopicMetadata, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
var partitionedMeta utils.PartitionedTopicMetadata
err := t.pulsar.Client.Get(endpoint, &partitionedMeta)
return partitionedMeta, err
}
func (t *topics) List(namespace utils.NameSpaceName) ([]string, []string, error) {
var partitionedTopics, nonPartitionedTopics []string
partitionedTopicsChan := make(chan []string)
nonPartitionedTopicsChan := make(chan []string)
errChan := make(chan error)
pp := t.pulsar.endpoint(t.persistentPath, namespace.String(), "partitioned")
np := t.pulsar.endpoint(t.nonPersistentPath, namespace.String(), "partitioned")
p := t.pulsar.endpoint(t.persistentPath, namespace.String())
n := t.pulsar.endpoint(t.nonPersistentPath, namespace.String())
go t.getTopics(pp, partitionedTopicsChan, errChan)
go t.getTopics(np, partitionedTopicsChan, errChan)
go t.getTopics(p, nonPartitionedTopicsChan, errChan)
go t.getTopics(n, nonPartitionedTopicsChan, errChan)
requestCount := 4
for {
select {
case err := <-errChan:
if err != nil {
return nil, nil, err
}
continue
case pTopic := <-partitionedTopicsChan:
requestCount--
partitionedTopics = append(partitionedTopics, pTopic...)
case npTopic := <-nonPartitionedTopicsChan:
requestCount--
nonPartitionedTopics = append(nonPartitionedTopics, npTopic...)
}
if requestCount == 0 {
break
}
}
return partitionedTopics, nonPartitionedTopics, nil
}
func (t *topics) getTopics(endpoint string, out chan<- []string, err chan<- error) {
var topics []string
err <- t.pulsar.Client.Get(endpoint, &topics)
out <- topics
}
func (t *topics) GetInternalInfo(topic utils.TopicName) (utils.ManagedLedgerInfo, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "internal-info")
var info utils.ManagedLedgerInfo
err := t.pulsar.Client.Get(endpoint, &info)
return info, err
}
func (t *topics) GetPermissions(topic utils.TopicName) (map[string][]utils.AuthAction, error) {
var permissions map[string][]utils.AuthAction
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "permissions")
err := t.pulsar.Client.Get(endpoint, &permissions)
return permissions, err
}
func (t *topics) GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "permissions", role)
s := []string{}
for _, v := range action {
s = append(s, v.String())
}
return t.pulsar.Client.Post(endpoint, s)
}
func (t *topics) RevokePermission(topic utils.TopicName, role string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "permissions", role)
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) Lookup(topic utils.TopicName) (utils.LookupData, error) {
var lookup utils.LookupData
endpoint := fmt.Sprintf("%s/%s", t.lookupPath, topic.GetRestPath())
err := t.pulsar.Client.Get(endpoint, &lookup)
return lookup, err
}
func (t *topics) GetBundleRange(topic utils.TopicName) (string, error) {
endpoint := fmt.Sprintf("%s/%s/%s", t.lookupPath, topic.GetRestPath(), "bundle")
data, err := t.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
return string(data), err
}
func (t *topics) GetLastMessageID(topic utils.TopicName) (utils.MessageID, error) {
var messageID utils.MessageID
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "lastMessageId")
err := t.pulsar.Client.Get(endpoint, &messageID)
return messageID, err
}
func (t *topics) GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error) {
var messageID utils.MessageID
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageid", strconv.FormatInt(timestamp, 10))
err := t.pulsar.Client.Get(endpoint, &messageID)
return messageID, err
}
func (t *topics) GetStats(topic utils.TopicName) (utils.TopicStats, error) {
var stats utils.TopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
err := t.pulsar.Client.Get(endpoint, &stats)
return stats, err
}
func (t *topics) GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error) {
var stats utils.TopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
params := map[string]string{
"getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog),
"subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize),
"getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog),
"excludePublishers": strconv.FormatBool(option.ExcludePublishers),
"excludeConsumers": strconv.FormatBool(option.ExcludeConsumers),
}
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true)
return stats, err
}
func (t *topics) GetInternalStats(topic utils.TopicName) (utils.PersistentTopicInternalStats, error) {
var stats utils.PersistentTopicInternalStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "internalStats")
err := t.pulsar.Client.Get(endpoint, &stats)
return stats, err
}
func (t *topics) GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error) {
var stats utils.PartitionedTopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats")
params := map[string]string{
"perPartition": strconv.FormatBool(perPartition),
}
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true)
return stats, err
}
func (t *topics) GetPartitionedStatsWithOption(topic utils.TopicName, perPartition bool,
option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) {
var stats utils.PartitionedTopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats")
params := map[string]string{
"perPartition": strconv.FormatBool(perPartition),
"getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog),
"subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize),
"getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog),
"excludePublishers": strconv.FormatBool(option.ExcludePublishers),
"excludeConsumers": strconv.FormatBool(option.ExcludeConsumers),
}
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true)
return stats, err
}
func (t *topics) Terminate(topic utils.TopicName) (utils.MessageID, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "terminate")
var messageID utils.MessageID
err := t.pulsar.Client.PostWithObj(endpoint, nil, &messageID)
return messageID, err
}
func (t *topics) Offload(topic utils.TopicName, messageID utils.MessageID) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offload")
return t.pulsar.Client.Put(endpoint, messageID)
}
func (t *topics) OffloadStatus(topic utils.TopicName) (utils.OffloadProcessStatus, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offload")
var status utils.OffloadProcessStatus
err := t.pulsar.Client.Get(endpoint, &status)
return status, err
}
func (t *topics) Unload(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "unload")
return t.pulsar.Client.Put(endpoint, nil)
}
func (t *topics) Compact(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compaction")
return t.pulsar.Client.Put(endpoint, nil)
}
func (t *topics) CompactStatus(topic utils.TopicName) (utils.LongRunningProcessStatus, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compaction")
var status utils.LongRunningProcessStatus
err := t.pulsar.Client.Get(endpoint, &status)
return status, err
}
func (t *topics) GetMessageTTL(topic utils.TopicName) (int, error) {
var ttl int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
err := t.pulsar.Client.Get(endpoint, &ttl)
return ttl, err
}
func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
var params = make(map[string]string)
params["messageTTL"] = strconv.Itoa(messageTTL)
err := t.pulsar.Client.PostWithQueryParams(endpoint, nil, params)
return err
}
func (t *topics) RemoveMessageTTL(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
var params = make(map[string]string)
params["messageTTL"] = strconv.Itoa(0)
err := t.pulsar.Client.DeleteWithQueryParams(endpoint, params)
return err
}
func (t *topics) GetMaxProducers(topic utils.TopicName) (int, error) {
var maxProducers int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxProducers")
err := t.pulsar.Client.Get(endpoint, &maxProducers)
return maxProducers, err
}
func (t *topics) SetMaxProducers(topic utils.TopicName, maxProducers int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxProducers")
err := t.pulsar.Client.Post(endpoint, &maxProducers)
return err
}
func (t *topics) RemoveMaxProducers(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxProducers")
err := t.pulsar.Client.Delete(endpoint)
return err
}
func (t *topics) GetMaxConsumers(topic utils.TopicName) (int, error) {
var maxConsumers int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers")
err := t.pulsar.Client.Get(endpoint, &maxConsumers)
return maxConsumers, err
}
func (t *topics) SetMaxConsumers(topic utils.TopicName, maxConsumers int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers")
err := t.pulsar.Client.Post(endpoint, &maxConsumers)
return err
}
func (t *topics) RemoveMaxConsumers(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers")
err := t.pulsar.Client.Delete(endpoint)
return err
}
func (t *topics) GetMaxUnackMessagesPerConsumer(topic utils.TopicName) (int, error) {
var maxNum int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.Get(endpoint, &maxNum)
return maxNum, err
}
func (t *topics) SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
return t.pulsar.Client.Post(endpoint, &maxUnackedNum)
}
func (t *topics) RemoveMaxUnackMessagesPerConsumer(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetMaxUnackMessagesPerSubscription(topic utils.TopicName) (int, error) {
var maxNum int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnSubscription")
err := t.pulsar.Client.Get(endpoint, &maxNum)
return maxNum, err
}
func (t *topics) SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnSubscription")
return t.pulsar.Client.Post(endpoint, &maxUnackedNum)
}
func (t *topics) RemoveMaxUnackMessagesPerSubscription(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnSubscription")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetPersistence(topic utils.TopicName) (*utils.PersistenceData, error) {
var persistenceData utils.PersistenceData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
err := t.pulsar.Client.Get(endpoint, &persistenceData)
return &persistenceData, err
}
func (t *topics) SetPersistence(topic utils.TopicName, persistenceData utils.PersistenceData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
return t.pulsar.Client.Post(endpoint, &persistenceData)
}
func (t *topics) RemovePersistence(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetDelayedDelivery(topic utils.TopicName) (*utils.DelayedDeliveryData, error) {
var delayedDeliveryData utils.DelayedDeliveryData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
err := t.pulsar.Client.Get(endpoint, &delayedDeliveryData)
return &delayedDeliveryData, err
}
func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData utils.DelayedDeliveryData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Post(endpoint, &delayedDeliveryData)
}
func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
var dispatchRateData utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
err := t.pulsar.Client.Get(endpoint, &dispatchRateData)
return &dispatchRateData, err
}
func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData utils.DispatchRateData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.Post(endpoint, &dispatchRateData)
}
func (t *topics) RemoveDispatchRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetPublishRate(topic utils.TopicName) (*utils.PublishRateData, error) {
var publishRateData utils.PublishRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate")
err := t.pulsar.Client.Get(endpoint, &publishRateData)
return &publishRateData, err
}
func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData utils.PublishRateData) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate")
return t.pulsar.Client.Post(endpoint, &publishRateData)
}
func (t *topics) RemovePublishRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetDeduplicationStatus(topic utils.TopicName) (bool, error) {
var enabled bool
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
err := t.pulsar.Client.Get(endpoint, &enabled)
return enabled, err
}
func (t *topics) SetDeduplicationStatus(topic utils.TopicName, enabled bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.Post(endpoint, enabled)
}
func (t *topics) RemoveDeduplicationStatus(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error) {
var policy utils.RetentionPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &policy, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return &policy, err
}
func (t *topics) RemoveRetention(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) SetRetention(topic utils.TopicName, data utils.RetentionPolicies) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
return t.pulsar.Client.Post(endpoint, data)
}
func (t *topics) GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error) {
var threshold int64
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &threshold, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return threshold, err
}
func (t *topics) SetCompactionThreshold(topic utils.TopicName, threshold int64) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
err := t.pulsar.Client.Post(endpoint, threshold)
return err
}
func (t *topics) RemoveCompactionThreshold(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
err := t.pulsar.Client.Delete(endpoint)
return err
}
func (t *topics) GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota,
error) {
var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuotaMap")
queryParams := map[string]string{"applied": strconv.FormatBool(applied)}
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &backlogQuotaMap, queryParams, true)
return backlogQuotaMap, err
}
func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota utils.BacklogQuota,
backlogQuotaType utils.BacklogQuotaType) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota")
params := make(map[string]string)
params["backlogQuotaType"] = string(backlogQuotaType)
return t.pulsar.Client.PostWithQueryParams(endpoint, &backlogQuota, params)
}
func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType utils.BacklogQuotaType) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota")
return t.pulsar.Client.DeleteWithQueryParams(endpoint, map[string]string{
"backlogQuotaType": string(backlogQuotaType),
})
}
func (t *topics) GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) {
var out utils.InactiveTopicPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &out, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return out, err
}
func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.Delete(endpoint)
}
func (t *topics) SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.Post(endpoint, data)
}
func (t *topics) SetReplicationClusters(topic utils.TopicName, data []string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replication")
return t.pulsar.Client.Post(endpoint, data)
}
func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error) {
var data []string
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replication")
err := t.pulsar.Client.Get(endpoint, &data)
return data, err
}