controller/mutators/etcd/assignment_mutator.go (137 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package etcd import ( "encoding/json" "github.com/uber/aresdb/cluster/kvstore" "strconv" pb "github.com/uber/aresdb/controller/generated/proto" "github.com/uber/aresdb/controller/models" "github.com/uber/aresdb/controller/mutators/common" "github.com/m3db/m3/src/cluster/kv" "github.com/uber/aresdb/utils" ) // NewIngestionAssignmentMutator creates new IngestionAssignmentMutator func NewIngestionAssignmentMutator(etcdStore kv.TxnStore) common.IngestionAssignmentMutator { return &ingestionAssignmentMutatorImpl{ etcdStore: etcdStore, } } type ingestionAssignmentMutatorImpl struct { etcdStore kv.TxnStore } // GetIngestionAssignment gets IngestionAssignment config by name func (j *ingestionAssignmentMutatorImpl) GetIngestionAssignment(namespace, name string) (ingestionAssignment models.IngestionAssignment, err error) { var entityConfig pb.EntityConfig _, err = readValue(j.etcdStore, utils.JobAssignmentsKey(namespace, name), &entityConfig) if err != nil || entityConfig.Tomstoned { return ingestionAssignment, common.ErrIngestionAssignmentDoesNotExist } err = json.Unmarshal(entityConfig.Config, &ingestionAssignment) return } // GetIngestionAssignments returns all IngestionAssignments config func (j *ingestionAssignmentMutatorImpl) GetIngestionAssignments(namespace string) (ingestionAssignments []models.IngestionAssignment, err error) { jobAssignments, _, err := readEntityList(j.etcdStore, utils.JobAssignmentsListKey(namespace)) if err != nil { return nil, err } ingestionAssignments = make([]models.IngestionAssignment, 0, len(jobAssignments.Entities)) for _, subscriber := range jobAssignments.Entities { ingestionAssignment, err := j.GetIngestionAssignment(namespace, subscriber.Name) if common.IsNonExist(err) { continue } if err != nil { return nil, err } ingestionAssignments = append(ingestionAssignments, ingestionAssignment) } return } // DeleteIngestionAssignment deletes a IngestionAssignment func (j *ingestionAssignmentMutatorImpl) DeleteIngestionAssignment(namespace, name string) error { entityList, entityListVersion, err := readEntityList(j.etcdStore, utils.JobAssignmentsListKey(namespace)) if err != nil { return err } entityList, found := deleteEntity(entityList, name) if !found { return common.ErrIngestionAssignmentDoesNotExist } var entityConfig pb.EntityConfig configVersion, err := readValue(j.etcdStore, utils.JobAssignmentsKey(namespace, name), &entityConfig) if common.IsNonExist(err) || (err == nil && entityConfig.Tomstoned) { return common.ErrIngestionAssignmentDoesNotExist } else if err != nil { return err } entityConfig.Tomstoned = true return kvstore.NewTransaction(). AddKeyValue(utils.JobAssignmentsListKey(namespace), entityListVersion, &entityList). AddKeyValue(utils.JobAssignmentsKey(namespace, name), configVersion, &entityConfig). WriteTo(j.etcdStore) } // UpdateIngestionAssignment updates IngestionAssignment config func (j *ingestionAssignmentMutatorImpl) UpdateIngestionAssignment(namespace string, ingestionAssignment models.IngestionAssignment) error { entityList, entityListVersion, err := readEntityList(j.etcdStore, utils.JobAssignmentsListKey(namespace)) if err != nil { return err } entityList, found := updateEntity(entityList, ingestionAssignment.Subscriber) if !found { return common.ErrIngestionAssignmentDoesNotExist } var entityConfig pb.EntityConfig configVersion, err := readValue(j.etcdStore, utils.JobAssignmentsKey(namespace, ingestionAssignment.Subscriber), &entityConfig) if common.IsNonExist(err) || (err == nil && entityConfig.Tomstoned) { return common.ErrIngestionAssignmentDoesNotExist } else if err != nil { return err } entityConfig.Config, err = json.Marshal(ingestionAssignment) if err != nil { return err } return kvstore.NewTransaction(). AddKeyValue(utils.JobAssignmentsListKey(namespace), entityListVersion, &entityList). AddKeyValue(utils.JobAssignmentsKey(namespace, ingestionAssignment.Subscriber), configVersion, &entityConfig). WriteTo(j.etcdStore) } // AddIngestionAssignment adds a new IngestionAssignment func (j *ingestionAssignmentMutatorImpl) AddIngestionAssignment(namespace string, ingestionAssignment models.IngestionAssignment) error { entityList, entityListVersion, err := readEntityList(j.etcdStore, utils.JobAssignmentsListKey(namespace)) if err != nil { return err } entityList, incarnation, found := addEntity(entityList, ingestionAssignment.Subscriber) if found { return common.ErrIngestionAssignmentAlreadyExist } var entityConfig pb.EntityConfig configVersion := kv.UninitializedVersion if incarnation > 0 { configVersion, err = readValue(j.etcdStore, utils.JobAssignmentsKey(namespace, ingestionAssignment.Subscriber), &entityConfig) if err != nil { return err } if !entityConfig.Tomstoned { return common.ErrIngestionAssignmentAlreadyExist } } entityConfig.Tomstoned = false entityConfig.Config, err = json.Marshal(ingestionAssignment) if err != nil { return err } return kvstore.NewTransaction(). AddKeyValue(utils.JobAssignmentsListKey(namespace), entityListVersion, &entityList). AddKeyValue(utils.JobAssignmentsKey(namespace, ingestionAssignment.Subscriber), configVersion, &entityConfig). WriteTo(j.etcdStore) } // GetHash returns hash that will be different if ingestionAssignment for subscriber changed func (j *ingestionAssignmentMutatorImpl) GetHash(namespace, subscriber string) (string, error) { entityList, _, err := readEntityList(j.etcdStore, utils.JobAssignmentsListKey(namespace)) if err != nil { return "", err } lastUpdatedAt := "" _, found := find(entityList, subscriber, func(name *pb.EntityName) { lastUpdatedAt = strconv.FormatInt(name.LastUpdatedAt, 10) }) if !found { return "", common.ErrIngestionAssignmentDoesNotExist } return lastUpdatedAt, nil }