controller/mutators/etcd/common.go (93 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package etcd import ( "fmt" "github.com/uber/aresdb/controller/mutators/common" "github.com/golang/protobuf/proto" "github.com/m3db/m3/src/cluster/kv" pb "github.com/uber/aresdb/controller/generated/proto" "github.com/uber/aresdb/utils" ) func addEntity(entityList pb.EntityList, name string) (res pb.EntityList, incarnation int, exist bool) { nowTs := utils.Now().UnixNano() for _, entityName := range entityList.Entities { if name == entityName.Name { if !entityName.Tomstoned { exist = true return } entityName.Incarnation++ incarnation = int(entityName.Incarnation) entityName.Tomstoned = false entityName.LastUpdatedAt = nowTs entityList.LastUpdatedAt = nowTs return entityList, incarnation, false } } entityList.Entities = append(entityList.Entities, &pb.EntityName{ Name: name, Tomstoned: false, Incarnation: 0, LastUpdatedAt: nowTs, }) entityList.LastUpdatedAt = nowTs return entityList, incarnation, false } func readValue(etcdStore kv.TxnStore, key string, out proto.Message) (version int, err error) { var value kv.Value value, err = etcdStore.Get(key) if err != nil { return } version = value.Version() err = value.Unmarshal(out) return } func readEntityList(etcdStore kv.TxnStore, key string) (entityList pb.EntityList, version int, err error) { version, err = readValue(etcdStore, key, &entityList) if common.IsNonExist(err) { err = common.ErrNamespaceDoesNotExist return } return } func deleteEntity(entityList pb.EntityList, name string) (pb.EntityList, bool) { nowTs := utils.Now().UnixNano() entityList, found := find(entityList, name, func(entity *pb.EntityName) { entity.Tomstoned = true entity.LastUpdatedAt = nowTs }) if found { entityList.LastUpdatedAt = nowTs } return entityList, found } func updateEntity(entityList pb.EntityList, name string) (pb.EntityList, bool) { nowTs := utils.Now().UnixNano() return find(entityList, name, func(entity *pb.EntityName) { entity.LastUpdatedAt = nowTs }) } func find(entityList pb.EntityList, name string, doWithEntity func(*pb.EntityName)) (pb.EntityList, bool) { found := false for _, entity := range entityList.Entities { if entity.Name == name && !entity.Tomstoned { found = true doWithEntity(entity) break } } return entityList, found } func getHash(etcdStore kv.TxnStore, key string) (string, error) { entityList, _, err := readEntityList(etcdStore, key) if err != nil { return "", err } var latestUpdateAt int64 for _, entity := range entityList.Entities { if entity.LastUpdatedAt > latestUpdateAt { latestUpdateAt = entity.LastUpdatedAt } } return fmt.Sprintf("%dcv%d", latestUpdateAt, entityList.LastUpdatedAt), nil }