pulsaradmin/pkg/admin/namespace.go (648 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package admin import ( "net/url" "strconv" "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) // Namespaces is admin interface for namespaces management type Namespaces interface { // GetNamespaces returns the list of all the namespaces for a certain tenant GetNamespaces(tenant string) ([]string, error) // GetTopics returns the list of all the topics under a certain namespace GetTopics(namespace string) ([]string, error) // GetPolicies returns the dump all the policies specified for a namespace GetPolicies(namespace string) (*utils.Policies, error) // CreateNamespace creates a new empty namespace with no policies attached CreateNamespace(namespace string) error // CreateNsWithNumBundles creates a new empty namespace with no policies attached CreateNsWithNumBundles(namespace string, numBundles int) error // CreateNsWithPolices creates a new namespace with the specified policies CreateNsWithPolices(namespace string, polices utils.Policies) error // CreateNsWithBundlesData creates a new empty namespace with no policies attached CreateNsWithBundlesData(namespace string, bundleData *utils.BundlesData) error // DeleteNamespace deletes an existing namespace DeleteNamespace(namespace string) error // DeleteNamespaceBundle deletes an existing bundle in a namespace DeleteNamespaceBundle(namespace string, bundleRange string) error // SetNamespaceMessageTTL sets the messages Time to Live for all the topics within a namespace SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error // GetNamespaceMessageTTL returns the message TTL for a namespace GetNamespaceMessageTTL(namespace string) (int, error) // GetRetention returns the retention configuration for a namespace GetRetention(namespace string) (*utils.RetentionPolicies, error) // SetRetention sets the retention configuration for all the topics on a namespace SetRetention(namespace string, policy utils.RetentionPolicies) error // GetBacklogQuotaMap returns backlog quota map on a namespace GetBacklogQuotaMap(namespace string) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) // SetBacklogQuota sets a backlog quota for all the topics on a namespace SetBacklogQuota(namespace string, backlogQuota utils.BacklogQuota, backlogQuotaType utils.BacklogQuotaType) error // RemoveBacklogQuota removes a backlog quota policy from a namespace RemoveBacklogQuota(namespace string) error // GetTopicAutoCreation returns the topic auto-creation config for a namespace GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) // SetTopicAutoCreation sets topic auto-creation config for a namespace, overriding broker settings SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error // RemoveTopicAutoCreation removes topic auto-creation config for a namespace, defaulting to broker settings RemoveTopicAutoCreation(namespace utils.NameSpaceName) error // SetSchemaValidationEnforced sets schema validation enforced for namespace SetSchemaValidationEnforced(namespace utils.NameSpaceName, schemaValidationEnforced bool) error // GetSchemaValidationEnforced returns schema validation enforced for namespace GetSchemaValidationEnforced(namespace utils.NameSpaceName) (bool, error) // SetSchemaAutoUpdateCompatibilityStrategy sets the strategy used to check the a new schema provided // by a producer is compatible with the current schema before it is installed SetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName, strategy utils.SchemaCompatibilityStrategy) error // GetSchemaAutoUpdateCompatibilityStrategy returns the strategy used to check the a new schema provided // by a producer is compatible with the current schema before it is installed GetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName) (utils.SchemaCompatibilityStrategy, error) // ClearOffloadDeleteLag clears the offload deletion lag for a namespace. ClearOffloadDeleteLag(namespace utils.NameSpaceName) error // SetOffloadDeleteLag sets the offload deletion lag for a namespace SetOffloadDeleteLag(namespace utils.NameSpaceName, timeMs int64) error // GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64, error) // SetOffloadThreshold sets the offloadThreshold for a namespace SetOffloadThreshold(namespace utils.NameSpaceName, threshold int64) error // GetOffloadThreshold returns the offloadThreshold for a namespace GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error) // SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for a namespace SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error // GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error) // SetCompactionThreshold sets the compactionThreshold for a namespace SetCompactionThreshold(namespace utils.NameSpaceName, threshold int64) error // GetCompactionThreshold returns the compactionThreshold for a namespace GetCompactionThreshold(namespace utils.NameSpaceName) (int64, error) // SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for a namespace. SetMaxConsumersPerSubscription(namespace utils.NameSpaceName, max int) error // GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace. GetMaxConsumersPerSubscription(namespace utils.NameSpaceName) (int, error) // SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace. SetMaxConsumersPerTopic(namespace utils.NameSpaceName, max int) error // GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace. GetMaxConsumersPerTopic(namespace utils.NameSpaceName) (int, error) // SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace. SetMaxProducersPerTopic(namespace utils.NameSpaceName, max int) error // GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace. GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error) // GetNamespaceReplicationClusters returns the replication clusters for a namespace GetNamespaceReplicationClusters(namespace string) ([]string, error) // SetNamespaceReplicationClusters returns the replication clusters for a namespace SetNamespaceReplicationClusters(namespace string, clusterIDs []string) error // SetNamespaceAntiAffinityGroup sets anti-affinity group name for a namespace SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error // GetAntiAffinityNamespaces returns all namespaces that grouped with given anti-affinity group GetAntiAffinityNamespaces(tenant, cluster, namespaceAntiAffinityGroup string) ([]string, error) // GetNamespaceAntiAffinityGroup returns anti-affinity group name for a namespace GetNamespaceAntiAffinityGroup(namespace string) (string, error) // DeleteNamespaceAntiAffinityGroup deletes anti-affinity group name for a namespace DeleteNamespaceAntiAffinityGroup(namespace string) error // SetDeduplicationStatus sets the deduplication status for all topics within a namespace // When deduplication is enabled, the broker will prevent to store the same Message multiple times SetDeduplicationStatus(namespace string, enableDeduplication bool) error // SetPersistence sets the persistence configuration for all the topics on a namespace SetPersistence(namespace string, persistence utils.PersistencePolicies) error // GetPersistence returns the persistence configuration for a namespace GetPersistence(namespace string) (*utils.PersistencePolicies, error) // SetBookieAffinityGroup sets bookie affinity group for a namespace to isolate namespace write to bookies that are // part of given affinity group SetBookieAffinityGroup(namespace string, bookieAffinityGroup utils.BookieAffinityGroupData) error // DeleteBookieAffinityGroup deletes bookie affinity group configured for a namespace DeleteBookieAffinityGroup(namespace string) error // GetBookieAffinityGroup returns bookie affinity group configured for a namespace GetBookieAffinityGroup(namespace string) (*utils.BookieAffinityGroupData, error) // Unload a namespace from the current serving broker Unload(namespace string) error // UnloadNamespaceBundle unloads namespace bundle UnloadNamespaceBundle(namespace, bundle string) error // SplitNamespaceBundle splits namespace bundle SplitNamespaceBundle(namespace, bundle string, unloadSplitBundles bool) error // GetNamespacePermissions returns permissions on a namespace GetNamespacePermissions(namespace utils.NameSpaceName) (map[string][]utils.AuthAction, error) // GrantNamespacePermission grants permission on a namespace. GrantNamespacePermission(namespace utils.NameSpaceName, role string, action []utils.AuthAction) error // RevokeNamespacePermission revokes permissions on a namespace. RevokeNamespacePermission(namespace utils.NameSpaceName, role string) error // GrantSubPermission grants permission to role to access subscription's admin-api GrantSubPermission(namespace utils.NameSpaceName, sName string, roles []string) error // RevokeSubPermission revoke permissions on a subscription's admin-api access RevokeSubPermission(namespace utils.NameSpaceName, sName, role string) error // GetSubPermissions returns subscription permissions on a namespace GetSubPermissions(namespace utils.NameSpaceName) (map[string][]string, error) // SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace SetSubscriptionAuthMode(namespace utils.NameSpaceName, mode utils.SubscriptionAuthMode) error // SetEncryptionRequiredStatus sets the encryption required status for all topics within a namespace SetEncryptionRequiredStatus(namespace utils.NameSpaceName, encrypt bool) error // UnsubscribeNamespace unsubscribe the given subscription on all topics on a namespace UnsubscribeNamespace(namespace utils.NameSpaceName, sName string) error // UnsubscribeNamespaceBundle unsubscribe the given subscription on all topics on a namespace bundle UnsubscribeNamespaceBundle(namespace utils.NameSpaceName, bundle, sName string) error // ClearNamespaceBundleBacklogForSubscription clears backlog for a given subscription on all // topics on a namespace bundle ClearNamespaceBundleBacklogForSubscription(namespace utils.NameSpaceName, bundle, sName string) error // ClearNamespaceBundleBacklog clears backlog for all topics on a namespace bundle ClearNamespaceBundleBacklog(namespace utils.NameSpaceName, bundle string) error // ClearNamespaceBacklogForSubscription clears backlog for a given subscription on all topics on a namespace ClearNamespaceBacklogForSubscription(namespace utils.NameSpaceName, sName string) error // ClearNamespaceBacklog clears backlog for all topics on a namespace ClearNamespaceBacklog(namespace utils.NameSpaceName) error // SetReplicatorDispatchRate sets replicator-Message-dispatch-rate (Replicators under this namespace // can dispatch this many messages per second) SetReplicatorDispatchRate(namespace utils.NameSpaceName, rate utils.DispatchRate) error // Get replicator-Message-dispatch-rate (Replicators under this namespace // can dispatch this many messages per second) GetReplicatorDispatchRate(namespace utils.NameSpaceName) (utils.DispatchRate, error) // SetSubscriptionDispatchRate sets subscription-Message-dispatch-rate (subscriptions under this namespace // can dispatch this many messages per second) SetSubscriptionDispatchRate(namespace utils.NameSpaceName, rate utils.DispatchRate) error // GetSubscriptionDispatchRate returns subscription-Message-dispatch-rate (subscriptions under this namespace // can dispatch this many messages per second) GetSubscriptionDispatchRate(namespace utils.NameSpaceName) (utils.DispatchRate, error) // SetSubscribeRate sets namespace-subscribe-rate (topics under this namespace will limit by subscribeRate) SetSubscribeRate(namespace utils.NameSpaceName, rate utils.SubscribeRate) error // GetSubscribeRate returns namespace-subscribe-rate (topics under this namespace allow subscribe // times per consumer in a period) GetSubscribeRate(namespace utils.NameSpaceName) (utils.SubscribeRate, error) // SetDispatchRate sets Message-dispatch-rate (topics under this namespace can dispatch // this many messages per second) SetDispatchRate(namespace utils.NameSpaceName, rate utils.DispatchRate) error // GetDispatchRate returns Message-dispatch-rate (topics under this namespace can dispatch // this many messages per second) GetDispatchRate(namespace utils.NameSpaceName) (utils.DispatchRate, error) // SetPublishRate sets the maximum rate or number of messages that producers can publish to topics in this namespace SetPublishRate(namespace utils.NameSpaceName, pubRate utils.PublishRate) error // GetPublishRate gets the maximum rate or number of messages that producer can publish to topics in the namespace GetPublishRate(namespace utils.NameSpaceName) (utils.PublishRate, error) // SetIsAllowAutoUpdateSchema sets whether to allow auto update schema on a namespace SetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName, isAllowAutoUpdateSchema bool) error // GetIsAllowAutoUpdateSchema gets whether to allow auto update schema on a namespace GetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName) (bool, error) // GetInactiveTopicPolicies gets the inactive topic policies on a namespace GetInactiveTopicPolicies(namespace utils.NameSpaceName) (utils.InactiveTopicPolicies, error) // RemoveInactiveTopicPolicies removes inactive topic policies from a namespace RemoveInactiveTopicPolicies(namespace utils.NameSpaceName) error // SetInactiveTopicPolicies sets the inactive topic policies on a namespace SetInactiveTopicPolicies(namespace utils.NameSpaceName, data utils.InactiveTopicPolicies) error // GetSubscriptionExpirationTime gets the subscription expiration time on a namespace. Returns -1 if not set GetSubscriptionExpirationTime(namespace utils.NameSpaceName) (int, error) // SetSubscriptionExpirationTime sets the subscription expiration time on a namespace SetSubscriptionExpirationTime(namespace utils.NameSpaceName, expirationTimeInMinutes int) error // RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace, // defaulting to broker settings RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error } type namespaces struct { pulsar *pulsarClient basePath string } // Namespaces is used to access the namespaces endpoints func (c *pulsarClient) Namespaces() Namespaces { return &namespaces{ pulsar: c, basePath: "/namespaces", } } func (n *namespaces) GetNamespaces(tenant string) ([]string, error) { var namespaces []string endpoint := n.pulsar.endpoint(n.basePath, tenant) err := n.pulsar.Client.Get(endpoint, &namespaces) return namespaces, err } func (n *namespaces) GetTopics(namespace string) ([]string, error) { var topics []string ns, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, ns.String(), "topics") err = n.pulsar.Client.Get(endpoint, &topics) return topics, err } func (n *namespaces) GetPolicies(namespace string) (*utils.Policies, error) { var police utils.Policies ns, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, ns.String()) err = n.pulsar.Client.Get(endpoint, &police) return &police, err } func (n *namespaces) CreateNsWithNumBundles(namespace string, numBundles int) error { return n.CreateNsWithBundlesData(namespace, utils.NewBundlesDataWithNumBundles(numBundles)) } func (n *namespaces) CreateNsWithPolices(namespace string, policies utils.Policies) error { ns, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, ns.String()) return n.pulsar.Client.Put(endpoint, &policies) } func (n *namespaces) CreateNsWithBundlesData(namespace string, bundleData *utils.BundlesData) error { ns, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, ns.String()) polices := new(utils.Policies) polices.Bundles = bundleData return n.pulsar.Client.Put(endpoint, &polices) } func (n *namespaces) CreateNamespace(namespace string) error { ns, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, ns.String()) return n.pulsar.Client.Put(endpoint, nil) } func (n *namespaces) DeleteNamespace(namespace string) error { ns, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, ns.String()) return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) DeleteNamespaceBundle(namespace string, bundleRange string) error { ns, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, ns.String(), bundleRange) return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) GetNamespaceMessageTTL(namespace string) (int, error) { var ttl int nsName, err := utils.GetNamespaceName(namespace) if err != nil { return 0, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "messageTTL") err = n.pulsar.Client.Get(endpoint, &ttl) return ttl, err } func (n *namespaces) SetNamespaceMessageTTL(namespace string, ttlInSeconds int) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "messageTTL") return n.pulsar.Client.Post(endpoint, &ttlInSeconds) } func (n *namespaces) SetRetention(namespace string, policy utils.RetentionPolicies) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention") return n.pulsar.Client.Post(endpoint, &policy) } func (n *namespaces) GetRetention(namespace string) (*utils.RetentionPolicies, error) { var policy utils.RetentionPolicies nsName, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention") err = n.pulsar.Client.Get(endpoint, &policy) return &policy, err } func (n *namespaces) GetBacklogQuotaMap(namespace string) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) { var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota nsName, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "backlogQuotaMap") err = n.pulsar.Client.Get(endpoint, &backlogQuotaMap) return backlogQuotaMap, err } func (n *namespaces) SetBacklogQuota(namespace string, backlogQuota utils.BacklogQuota, backlogQuotaType utils.BacklogQuotaType) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "backlogQuota") params := make(map[string]string) params["backlogQuotaType"] = string(backlogQuotaType) return n.pulsar.Client.PostWithQueryParams(endpoint, &backlogQuota, params) } func (n *namespaces) RemoveBacklogQuota(namespace string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "backlogQuota") params := map[string]string{ "backlogQuotaType": string(utils.DestinationStorage), } return n.pulsar.Client.DeleteWithQueryParams(endpoint, params) } func (n *namespaces) GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) { var topicAutoCreation utils.TopicAutoCreationConfig endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") err := n.pulsar.Client.Get(endpoint, &topicAutoCreation) return &topicAutoCreation, err } func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") return n.pulsar.Client.Post(endpoint, &config) } func (n *namespaces) RemoveTopicAutoCreation(namespace utils.NameSpaceName) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) SetSchemaValidationEnforced(namespace utils.NameSpaceName, schemaValidationEnforced bool) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaValidationEnforced") return n.pulsar.Client.Post(endpoint, schemaValidationEnforced) } func (n *namespaces) GetSchemaValidationEnforced(namespace utils.NameSpaceName) (bool, error) { var result bool endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaValidationEnforced") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName, strategy utils.SchemaCompatibilityStrategy) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaAutoUpdateCompatibilityStrategy") return n.pulsar.Client.Put(endpoint, strategy.String()) } func (n *namespaces) GetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName) ( utils.SchemaCompatibilityStrategy, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaAutoUpdateCompatibilityStrategy") b, err := n.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false) if err != nil { return "", err } s, err := utils.ParseSchemaAutoUpdateCompatibilityStrategy(strings.ReplaceAll(string(b), "\"", "")) if err != nil { return "", err } return s, nil } func (n *namespaces) ClearOffloadDeleteLag(namespace utils.NameSpaceName) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadDeletionLagMs") return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) SetOffloadDeleteLag(namespace utils.NameSpaceName, timeMs int64) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadDeletionLagMs") return n.pulsar.Client.Put(endpoint, timeMs) } func (n *namespaces) GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64, error) { var result int64 endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadDeletionLagMs") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetMaxConsumersPerSubscription(namespace utils.NameSpaceName, max int) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerSubscription") return n.pulsar.Client.Post(endpoint, max) } func (n *namespaces) GetMaxConsumersPerSubscription(namespace utils.NameSpaceName) (int, error) { var result int endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerSubscription") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetOffloadThreshold(namespace utils.NameSpaceName, threshold int64) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThreshold") return n.pulsar.Client.Put(endpoint, threshold) } func (n *namespaces) GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error) { var result int64 endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThreshold") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetOffloadThresholdInSeconds(namespace utils.NameSpaceName, threshold int64) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds") return n.pulsar.Client.Put(endpoint, threshold) } func (n *namespaces) GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error) { var result int64 endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetMaxConsumersPerTopic(namespace utils.NameSpaceName, max int) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic") return n.pulsar.Client.Post(endpoint, max) } func (n *namespaces) GetMaxConsumersPerTopic(namespace utils.NameSpaceName) (int, error) { var result int endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetCompactionThreshold(namespace utils.NameSpaceName, threshold int64) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "compactionThreshold") return n.pulsar.Client.Put(endpoint, threshold) } func (n *namespaces) GetCompactionThreshold(namespace utils.NameSpaceName) (int64, error) { var result int64 endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "compactionThreshold") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetMaxProducersPerTopic(namespace utils.NameSpaceName, max int) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxProducersPerTopic") return n.pulsar.Client.Post(endpoint, max) } func (n *namespaces) GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error) { var result int endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxProducersPerTopic") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) GetNamespaceReplicationClusters(namespace string) ([]string, error) { var data []string nsName, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "replication") err = n.pulsar.Client.Get(endpoint, &data) return data, err } func (n *namespaces) SetNamespaceReplicationClusters(namespace string, clusterIDs []string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "replication") return n.pulsar.Client.Post(endpoint, &clusterIDs) } func (n *namespaces) SetNamespaceAntiAffinityGroup(namespace string, namespaceAntiAffinityGroup string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "antiAffinity") return n.pulsar.Client.Post(endpoint, namespaceAntiAffinityGroup) } func (n *namespaces) GetAntiAffinityNamespaces(tenant, cluster, namespaceAntiAffinityGroup string) ([]string, error) { var data []string endpoint := n.pulsar.endpoint(n.basePath, cluster, "antiAffinity", namespaceAntiAffinityGroup) params := map[string]string{ "property": tenant, } _, err := n.pulsar.Client.GetWithQueryParams(endpoint, &data, params, false) return data, err } func (n *namespaces) GetNamespaceAntiAffinityGroup(namespace string) (string, error) { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return "", err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "antiAffinity") data, err := n.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false) return string(data), err } func (n *namespaces) DeleteNamespaceAntiAffinityGroup(namespace string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "antiAffinity") return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) SetDeduplicationStatus(namespace string, enableDeduplication bool) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "deduplication") return n.pulsar.Client.Post(endpoint, enableDeduplication) } func (n *namespaces) SetPersistence(namespace string, persistence utils.PersistencePolicies) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence") return n.pulsar.Client.Post(endpoint, &persistence) } func (n *namespaces) SetBookieAffinityGroup(namespace string, bookieAffinityGroup utils.BookieAffinityGroupData) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence", "bookieAffinity") return n.pulsar.Client.Post(endpoint, &bookieAffinityGroup) } func (n *namespaces) DeleteBookieAffinityGroup(namespace string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence", "bookieAffinity") return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) GetBookieAffinityGroup(namespace string) (*utils.BookieAffinityGroupData, error) { var data utils.BookieAffinityGroupData nsName, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence", "bookieAffinity") err = n.pulsar.Client.Get(endpoint, &data) return &data, err } func (n *namespaces) GetPersistence(namespace string) (*utils.PersistencePolicies, error) { var persistence utils.PersistencePolicies nsName, err := utils.GetNamespaceName(namespace) if err != nil { return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence") err = n.pulsar.Client.Get(endpoint, &persistence) return &persistence, err } func (n *namespaces) Unload(namespace string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "unload") return n.pulsar.Client.Put(endpoint, nil) } func (n *namespaces) UnloadNamespaceBundle(namespace, bundle string) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), bundle, "unload") return n.pulsar.Client.Put(endpoint, nil) } func (n *namespaces) SplitNamespaceBundle(namespace, bundle string, unloadSplitBundles bool) error { nsName, err := utils.GetNamespaceName(namespace) if err != nil { return err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), bundle, "split") params := map[string]string{ "unload": strconv.FormatBool(unloadSplitBundles), } return n.pulsar.Client.PutWithQueryParams(endpoint, nil, nil, params) } func (n *namespaces) GetNamespacePermissions(namespace utils.NameSpaceName) (map[string][]utils.AuthAction, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions") var permissions map[string][]utils.AuthAction err := n.pulsar.Client.Get(endpoint, &permissions) return permissions, err } func (n *namespaces) GrantNamespacePermission(namespace utils.NameSpaceName, role string, action []utils.AuthAction) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", role) s := make([]string, 0) for _, v := range action { s = append(s, v.String()) } return n.pulsar.Client.Post(endpoint, s) } func (n *namespaces) RevokeNamespacePermission(namespace utils.NameSpaceName, role string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", role) return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) GrantSubPermission(namespace utils.NameSpaceName, sName string, roles []string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", "subscription", sName) return n.pulsar.Client.Post(endpoint, roles) } func (n *namespaces) RevokeSubPermission(namespace utils.NameSpaceName, sName, role string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", sName, role) return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) GetSubPermissions(namespace utils.NameSpaceName) (map[string][]string, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", "subscription") var permissions map[string][]string err := n.pulsar.Client.Get(endpoint, &permissions) return permissions, err } func (n *namespaces) SetSubscriptionAuthMode(namespace utils.NameSpaceName, mode utils.SubscriptionAuthMode) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionAuthMode") return n.pulsar.Client.Post(endpoint, mode.String()) } func (n *namespaces) SetEncryptionRequiredStatus(namespace utils.NameSpaceName, encrypt bool) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "encryptionRequired") return n.pulsar.Client.Post(endpoint, strconv.FormatBool(encrypt)) } func (n *namespaces) UnsubscribeNamespace(namespace utils.NameSpaceName, sName string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "unsubscribe", url.QueryEscape(sName)) return n.pulsar.Client.Post(endpoint, nil) } func (n *namespaces) UnsubscribeNamespaceBundle(namespace utils.NameSpaceName, bundle, sName string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), bundle, "unsubscribe", url.QueryEscape(sName)) return n.pulsar.Client.Post(endpoint, nil) } func (n *namespaces) ClearNamespaceBundleBacklogForSubscription(namespace utils.NameSpaceName, bundle, sName string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), bundle, "clearBacklog", url.QueryEscape(sName)) return n.pulsar.Client.Post(endpoint, nil) } func (n *namespaces) ClearNamespaceBundleBacklog(namespace utils.NameSpaceName, bundle string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), bundle, "clearBacklog") return n.pulsar.Client.Post(endpoint, nil) } func (n *namespaces) ClearNamespaceBacklogForSubscription(namespace utils.NameSpaceName, sName string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "clearBacklog", url.QueryEscape(sName)) return n.pulsar.Client.Post(endpoint, nil) } func (n *namespaces) ClearNamespaceBacklog(namespace utils.NameSpaceName) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "clearBacklog") return n.pulsar.Client.Post(endpoint, nil) } func (n *namespaces) SetReplicatorDispatchRate(namespace utils.NameSpaceName, rate utils.DispatchRate) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "replicatorDispatchRate") return n.pulsar.Client.Post(endpoint, rate) } func (n *namespaces) GetReplicatorDispatchRate(namespace utils.NameSpaceName) (utils.DispatchRate, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "replicatorDispatchRate") var rate utils.DispatchRate err := n.pulsar.Client.Get(endpoint, &rate) return rate, err } func (n *namespaces) SetSubscriptionDispatchRate(namespace utils.NameSpaceName, rate utils.DispatchRate) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionDispatchRate") return n.pulsar.Client.Post(endpoint, rate) } func (n *namespaces) GetSubscriptionDispatchRate(namespace utils.NameSpaceName) (utils.DispatchRate, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionDispatchRate") var rate utils.DispatchRate err := n.pulsar.Client.Get(endpoint, &rate) return rate, err } func (n *namespaces) SetSubscribeRate(namespace utils.NameSpaceName, rate utils.SubscribeRate) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscribeRate") return n.pulsar.Client.Post(endpoint, rate) } func (n *namespaces) GetSubscribeRate(namespace utils.NameSpaceName) (utils.SubscribeRate, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscribeRate") var rate utils.SubscribeRate err := n.pulsar.Client.Get(endpoint, &rate) return rate, err } func (n *namespaces) SetDispatchRate(namespace utils.NameSpaceName, rate utils.DispatchRate) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "dispatchRate") return n.pulsar.Client.Post(endpoint, rate) } func (n *namespaces) GetDispatchRate(namespace utils.NameSpaceName) (utils.DispatchRate, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "dispatchRate") var rate utils.DispatchRate err := n.pulsar.Client.Get(endpoint, &rate) return rate, err } func (n *namespaces) SetPublishRate(namespace utils.NameSpaceName, pubRate utils.PublishRate) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "publishRate") return n.pulsar.Client.Post(endpoint, pubRate) } func (n *namespaces) GetPublishRate(namespace utils.NameSpaceName) (utils.PublishRate, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "publishRate") var pubRate utils.PublishRate err := n.pulsar.Client.Get(endpoint, &pubRate) return pubRate, err } func (n *namespaces) SetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName, isAllowAutoUpdateSchema bool) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "isAllowAutoUpdateSchema") return n.pulsar.Client.Post(endpoint, &isAllowAutoUpdateSchema) } func (n *namespaces) GetIsAllowAutoUpdateSchema(namespace utils.NameSpaceName) (bool, error) { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "isAllowAutoUpdateSchema") var result bool err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) GetInactiveTopicPolicies(namespace utils.NameSpaceName) (utils.InactiveTopicPolicies, error) { var out utils.InactiveTopicPolicies endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "inactiveTopicPolicies") err := n.pulsar.Client.Get(endpoint, &out) return out, err } func (n *namespaces) RemoveInactiveTopicPolicies(namespace utils.NameSpaceName) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "inactiveTopicPolicies") return n.pulsar.Client.Delete(endpoint) } func (n *namespaces) SetInactiveTopicPolicies(namespace utils.NameSpaceName, data utils.InactiveTopicPolicies) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "inactiveTopicPolicies") return n.pulsar.Client.Post(endpoint, data) } func (n *namespaces) GetSubscriptionExpirationTime(namespace utils.NameSpaceName) (int, error) { var result = -1 endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") err := n.pulsar.Client.Get(endpoint, &result) return result, err } func (n *namespaces) SetSubscriptionExpirationTime(namespace utils.NameSpaceName, subscriptionExpirationTimeInMinutes int) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") return n.pulsar.Client.Post(endpoint, &subscriptionExpirationTimeInMinutes) } func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") return n.pulsar.Client.Delete(endpoint) }