controller/mutators/etcd/membership_mutator.go (92 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd
import (
"errors"
"strconv"
"unsafe"
"github.com/m3db/m3/src/cluster/services"
"github.com/uber/aresdb/cluster/kvstore"
"github.com/uber/aresdb/controller/models"
"github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
)
// NewMembershipMutator creates new MembershipMutator
func NewMembershipMutator(etcdClient *kvstore.EtcdClient) common.MembershipMutator {
return membershipMutatorImpl{
etcdClient: etcdClient,
}
}
type membershipMutatorImpl struct {
etcdClient *kvstore.EtcdClient
}
func (mm membershipMutatorImpl) Join(namespace string, instance models.Instance) error {
return errors.New("join not supported in etcd version of membership mutator")
}
func (mm membershipMutatorImpl) Leave(namespace, instanceName string) error {
return errors.New("leave not supported in etcd version of membership mutator")
}
func (mm membershipMutatorImpl) GetInstance(namespace, instanceName string) (instance models.Instance, err error) {
serviceID := services.NewServiceID().
SetName(utils.DataNodeServiceName(namespace)).
SetEnvironment(mm.etcdClient.Environment).
SetZone(mm.etcdClient.Zone)
hbtService, err := mm.etcdClient.Services.HeartbeatService(serviceID)
if err != nil {
err = utils.StackError(err, "failed to get heartbeat service, namespace: %s", namespace)
}
instances, err := hbtService.GetInstances()
if err != nil {
err = utils.StackError(err, "failed to get instances, namespace: %s", namespace)
}
for _, instance := range instances {
if instance.ID() == instanceName {
return models.Instance{
Name: instance.ID(),
Host: instance.Hostname(),
Port: instance.Port(),
}, nil
}
}
return instance, common.ErrInstanceDoesNotExist
}
func (mm membershipMutatorImpl) GetInstances(namespace string) ([]models.Instance, error) {
serviceID := services.NewServiceID().
SetName(utils.DataNodeServiceName(namespace)).
SetEnvironment(mm.etcdClient.Environment).
SetZone(mm.etcdClient.Zone)
hbtService, err := mm.etcdClient.Services.HeartbeatService(serviceID)
if err != nil {
return nil, utils.StackError(err, "failed to get heartbeat service, namespace: %s", namespace)
}
instances, err := hbtService.GetInstances()
if err != nil {
return nil, utils.StackError(err, "failed to get instances, namespace: %s", namespace)
}
result := make([]models.Instance, 0, len(instances))
for _, instance := range instances {
result = append(result, models.Instance{
Name: instance.ID(),
Host: instance.Hostname(),
Port: instance.Port(),
})
}
return result, nil
}
// GetHash returns hash that will be different if any instance changed
func (mm membershipMutatorImpl) GetHash(namespace string) (string, error) {
serviceID := services.NewServiceID().
SetName(utils.DataNodeServiceName(namespace)).
SetEnvironment(mm.etcdClient.Environment).
SetZone(mm.etcdClient.Zone)
hbtService, err := mm.etcdClient.Services.HeartbeatService(serviceID)
if err != nil {
return "", utils.StackError(err, "failed to get heartbeat service, namespace: %s", namespace)
}
ids, err := hbtService.Get()
if err != nil {
return "", utils.StackError(err, "failed to get service ids, namespace: %s", namespace)
}
hash := uint32(1)
for _, id := range ids {
strByts := []byte(id)
hash += hash*31 + utils.Murmur3Sum32(unsafe.Pointer(&strByts[0]), len(strByts), 0)
}
return strconv.Itoa(int(hash)), nil
}