controller/mutators/etcd/subscriber_mutator.go (84 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd
import (
"github.com/m3db/m3/src/cluster/services"
"github.com/uber/aresdb/cluster/kvstore"
"github.com/uber/aresdb/controller/models"
"github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
"strconv"
"unsafe"
)
type subscriberMutator struct {
etcdClient *kvstore.EtcdClient
}
// NewSubscriberMutator creates new subscriber mutator based on etcd
func NewSubscriberMutator(etcdClient *kvstore.EtcdClient) common.SubscriberMutator {
return &subscriberMutator{
etcdClient: etcdClient,
}
}
// GetSubscriber returns a subscriber
func (s *subscriberMutator) GetSubscriber(namespace, subscriberName string) (subscriber models.Subscriber, err error) {
serviceID := services.NewServiceID().
SetName(utils.SubscriberServiceName(namespace)).
SetEnvironment(s.etcdClient.Environment).
SetZone(s.etcdClient.Zone)
hbtSvc, err := s.etcdClient.Services.HeartbeatService(serviceID)
if err != nil {
err = utils.StackError(err, "failed to get heartbeat service for namespace: %s", namespace)
return
}
ids, err := hbtSvc.Get()
if err != nil {
err = utils.StackError(err, "failed to get instances for namespace: %s", namespace)
return
}
for _, id := range ids {
if id == subscriberName {
return models.Subscriber{
Name: id,
}, nil
}
}
return subscriber, common.ErrSubscriberDoesNotExist
}
// GetSubscribers returns a list of subscribers
func (s *subscriberMutator) GetSubscribers(namespace string) (subscribers []models.Subscriber, err error) {
serviceID := services.NewServiceID().
SetName(utils.SubscriberServiceName(namespace)).
SetEnvironment(s.etcdClient.Environment).
SetZone(s.etcdClient.Zone)
hbtSvc, err := s.etcdClient.Services.HeartbeatService(serviceID)
if err != nil {
err = utils.StackError(err, "failed to get heartbeat service for namespace: %s", namespace)
return
}
ids, err := hbtSvc.Get()
if err != nil {
err = utils.StackError(err, "failed to get instances for namespace: %s", namespace)
return
}
for _, id := range ids {
subscribers = append(subscribers, models.Subscriber{
Name: id,
})
}
return
}
// GetHash returns hash of all subscribers
func (s *subscriberMutator) GetHash(namespace string) (string, error) {
serviceID := services.NewServiceID().
SetName(utils.SubscriberServiceName(namespace)).
SetEnvironment(s.etcdClient.Environment).
SetZone(s.etcdClient.Zone)
hbtSvc, err := s.etcdClient.Services.HeartbeatService(serviceID)
if err != nil {
return "", utils.StackError(err, "failed to get heartbeat service for namespace: %s", namespace)
}
ids, err := hbtSvc.Get()
if err != nil {
return "", utils.StackError(err, "failed to get instances for namespace: %s", namespace)
}
hash := uint32(1)
for _, id := range ids {
strByts := []byte(id)
hash += hash*31 + utils.Murmur3Sum32(unsafe.Pointer(&strByts[0]), len(strByts), 0)
}
return strconv.Itoa(int(hash)), nil
}