controller/mutators/etcd/job_mutator.go (150 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcd
import (
"encoding/json"
"github.com/uber/aresdb/cluster/kvstore"
"github.com/m3db/m3/src/cluster/kv"
pb "github.com/uber/aresdb/controller/generated/proto"
"github.com/uber/aresdb/controller/models"
"github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/utils"
"go.uber.org/zap"
)
// NewJobMutator creates new JobMutator
func NewJobMutator(etcdStore kv.TxnStore, logger *zap.SugaredLogger) common.JobMutator {
return &jobMutatorImpl{
logger: logger,
etcdStore: etcdStore,
}
}
type jobMutatorImpl struct {
logger *zap.SugaredLogger
etcdStore kv.TxnStore
}
// GetJob gets job config by name
func (j *jobMutatorImpl) GetJob(namespace, name string) (job models.JobConfig, err error) {
var jobProto pb.EntityConfig
jobProto, version, err := j.readJob(namespace, name)
if err != nil {
return job, err
}
if jobProto.Tomstoned {
return job, common.ErrJobConfigDoesNotExist
}
job.StreamingConfig = models.DefaultKafkaConfig
err = json.Unmarshal(jobProto.Config, &job)
// always return etcd internal version, version from job payload is ignored
job.Version = version
return
}
// GetJobs returns all jobs config
func (j *jobMutatorImpl) GetJobs(namespace string) ([]models.JobConfig, error) {
var jobList pb.EntityList
jobList, _, err := readEntityList(j.etcdStore, utils.JobListKey(namespace))
if err != nil {
return nil, err
}
jobs := make([]models.JobConfig, 0)
for _, jobName := range jobList.Entities {
var job models.JobConfig
job, err := j.GetJob(namespace, jobName.Name)
if common.IsNonExist(err) {
continue
}
if err != nil {
return jobs, err
}
jobs = append(jobs, job)
}
return jobs, err
}
// DeleteJob deletes a job
func (j *jobMutatorImpl) DeleteJob(namespace, name string) error {
jobList, jobListVersion, err := readEntityList(j.etcdStore, utils.JobListKey(namespace))
if err != nil {
return err
}
jobList, found := deleteEntity(jobList, name)
if !found {
return common.ErrJobConfigDoesNotExist
}
// found
jobConfig, jobConfigVersion, err := j.readJob(namespace, name)
if err != nil {
return err
}
if jobConfig.Tomstoned {
return common.ErrJobConfigDoesNotExist
}
jobConfig.Tomstoned = true
return kvstore.NewTransaction().
AddKeyValue(utils.JobListKey(namespace), jobListVersion, &jobList).
AddKeyValue(utils.JobKey(namespace, name), jobConfigVersion, &jobConfig).
WriteTo(j.etcdStore)
}
// UpdateJob updates job config
func (j *jobMutatorImpl) UpdateJob(namespace string, job models.JobConfig) (err error) {
jobListProto, jobListVersion, err := readEntityList(j.etcdStore, utils.JobListKey(namespace))
if err != nil {
return err
}
jobListProto, found := updateEntity(jobListProto, job.Name)
if !found {
j.logger.With(
"job", job,
).Info("job not found for update, creating new job")
return j.AddJob(namespace, job)
}
jobProto, jobVersion, err := j.readJob(namespace, job.Name)
if err != nil {
return err
}
if jobProto.Tomstoned {
return common.ErrJobConfigDoesNotExist
}
jobProto.Tomstoned = false
jobProto.Config, err = json.Marshal(job)
if err != nil {
return
}
return kvstore.NewTransaction().
AddKeyValue(utils.JobListKey(namespace), jobListVersion, &jobListProto).
AddKeyValue(utils.JobKey(namespace, job.Name), jobVersion, &jobProto).
WriteTo(j.etcdStore)
}
// AddJob adds a new job
func (j *jobMutatorImpl) AddJob(namespace string, job models.JobConfig) error {
err := common.Validate(&job, nil)
if err != nil {
return err
}
jobListProto, jobListVersion, err := readEntityList(j.etcdStore, utils.JobListKey(namespace))
if err != nil {
return err
}
jobProto := pb.EntityConfig{
Name: job.Name,
Tomstoned: false,
}
jobVersion := kv.UninitializedVersion
jobProto.Config, err = json.Marshal(job)
if err != nil {
return err
}
jobListProto, incarnation, exist := addEntity(jobListProto, jobProto.Name)
if exist {
return common.ErrJobConfigAlreadyExist
}
if incarnation > 0 {
jobProto, jobVersion, err = j.readJob(namespace, job.Name)
if err != nil {
return err
}
jobProto.Tomstoned = false
}
return kvstore.NewTransaction().
AddKeyValue(utils.JobListKey(namespace), jobListVersion, &jobListProto).
AddKeyValue(utils.JobKey(namespace, job.Name), jobVersion, &jobProto).
WriteTo(j.etcdStore)
}
// GetHash returns hash that will be different if any job changed
func (j *jobMutatorImpl) GetHash(namespace string) (string, error) {
return getHash(j.etcdStore, utils.JobListKey(namespace))
}
func (j *jobMutatorImpl) readJob(namespace string, name string) (jobConfig pb.EntityConfig, version int, err error) {
version, err = readValue(j.etcdStore, utils.JobKey(namespace, name), &jobConfig)
if common.IsNonExist(err) {
err = common.ErrJobConfigDoesNotExist
}
return
}