pulsaradmin/pkg/admin/cluster.go (85 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Clusters is admin interface for clusters management
type Clusters interface {
// List returns the list of clusters
List() ([]string, error)
// Get the configuration data for the specified cluster
Get(string) (utils.ClusterData, error)
// Create a new cluster
Create(utils.ClusterData) error
// Delete an existing cluster
Delete(string) error
// Update the configuration for a cluster
Update(utils.ClusterData) error
// UpdatePeerClusters updates peer cluster names.
UpdatePeerClusters(string, []string) error
// GetPeerClusters returns peer-cluster names
GetPeerClusters(string) ([]string, error)
// CreateFailureDomain creates a domain into cluster
CreateFailureDomain(utils.FailureDomainData) error
// GetFailureDomain returns the domain registered into a cluster
GetFailureDomain(clusterName, domainName string) (utils.FailureDomainData, error)
// ListFailureDomains returns all registered domains in cluster
ListFailureDomains(string) (utils.FailureDomainMap, error)
// DeleteFailureDomain deletes a domain in cluster
DeleteFailureDomain(utils.FailureDomainData) error
// UpdateFailureDomain updates a domain into cluster
UpdateFailureDomain(utils.FailureDomainData) error
}
type clusters struct {
pulsar *pulsarClient
basePath string
}
// Clusters is used to access the cluster endpoints.
func (c *pulsarClient) Clusters() Clusters {
return &clusters{
pulsar: c,
basePath: "/clusters",
}
}
func (c *clusters) List() ([]string, error) {
var clusters []string
err := c.pulsar.Client.Get(c.pulsar.endpoint(c.basePath), &clusters)
return clusters, err
}
func (c *clusters) Get(name string) (utils.ClusterData, error) {
cdata := utils.ClusterData{}
endpoint := c.pulsar.endpoint(c.basePath, name)
err := c.pulsar.Client.Get(endpoint, &cdata)
return cdata, err
}
func (c *clusters) Create(cdata utils.ClusterData) error {
endpoint := c.pulsar.endpoint(c.basePath, cdata.Name)
return c.pulsar.Client.Put(endpoint, &cdata)
}
func (c *clusters) Delete(name string) error {
endpoint := c.pulsar.endpoint(c.basePath, name)
return c.pulsar.Client.Delete(endpoint)
}
func (c *clusters) Update(cdata utils.ClusterData) error {
endpoint := c.pulsar.endpoint(c.basePath, cdata.Name)
return c.pulsar.Client.Post(endpoint, &cdata)
}
func (c *clusters) GetPeerClusters(name string) ([]string, error) {
var peerClusters []string
endpoint := c.pulsar.endpoint(c.basePath, name, "peers")
err := c.pulsar.Client.Get(endpoint, &peerClusters)
return peerClusters, err
}
func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error {
endpoint := c.pulsar.endpoint(c.basePath, cluster, "peers")
return c.pulsar.Client.Post(endpoint, peerClusters)
}
func (c *clusters) CreateFailureDomain(data utils.FailureDomainData) error {
endpoint := c.pulsar.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName)
return c.pulsar.Client.Post(endpoint, &data)
}
func (c *clusters) GetFailureDomain(clusterName string, domainName string) (utils.FailureDomainData, error) {
var res utils.FailureDomainData
endpoint := c.pulsar.endpoint(c.basePath, clusterName, "failureDomains", domainName)
err := c.pulsar.Client.Get(endpoint, &res)
return res, err
}
func (c *clusters) ListFailureDomains(clusterName string) (utils.FailureDomainMap, error) {
var domainData utils.FailureDomainMap
endpoint := c.pulsar.endpoint(c.basePath, clusterName, "failureDomains")
err := c.pulsar.Client.Get(endpoint, &domainData)
return domainData, err
}
func (c *clusters) DeleteFailureDomain(data utils.FailureDomainData) error {
endpoint := c.pulsar.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName)
return c.pulsar.Client.Delete(endpoint)
}
func (c *clusters) UpdateFailureDomain(data utils.FailureDomainData) error {
endpoint := c.pulsar.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName)
return c.pulsar.Client.Post(endpoint, &data)
}