subscriber/common/rules/job_config.go (273 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rules import ( "encoding/json" "fmt" "io/ioutil" "os" "strings" "github.com/uber/aresdb/client" "github.com/uber/aresdb/controller/models" "github.com/getlantern/deepcopy" memCom "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/subscriber/config" "go.uber.org/fx" "go.uber.org/zap" ) // Module configures JobConfigs. var Module = fx.Options( fx.Provide( NewJobConfigs, ), ) // Params defines the base objects for jobConfigs. type Params struct { fx.In ServiceConfig config.ServiceConfig } // Result defines the objects that the rules module provides. type Result struct { fx.Out JobConfigs JobConfigs } // JobConfigs contains configuration and information for all jobs and therir destination ares clusters. type JobConfigs map[string]JobAresConfig // JobAresConfig contains configuration and information for each Ares cluster and job configuration. type JobAresConfig map[string]*JobConfig // JobConfig wraps job config controller type JobConfig struct { models.JobConfig // maps from column name to columnID for convenience columnDict map[string]int destinations map[string]*DestinationConfig transformations map[string]*TransformationConfig primaryKeys map[string]int primaryKeyBytes int } // DestinationConfig defines the configuration needed to save data in ares type DestinationConfig struct { // Table is ares table Table string `json:"table" yaml:"table"` // Column is ares table column name Column string `json:"column" yaml:"column"` // UpdateMode is column's upsert mode UpdateMode memCom.ColumnUpdateMode `json:"update_mode,omitempty" yaml:"update_mode,omitempty"` } // TransformationConfig defiines the configuration needed to generate a specific transformation type TransformationConfig struct { // Type of transformationConfig to apply for the column, // like timestamp, uuid, uuid_hll etc Type string `json:"type" yaml:"type"` // Source is the field name to read the value from Source string `json:"source" yaml:"source"` // Default value to use if value is empty Default string `json:"default" yaml:"default"` // Context help complex transformations to define information // needed to parse the values Context map[string]string } // Assignment defines the job assignment of the ares-subscriber type Assignment struct { // Subscriber is ares-subscriber instance name Subscriber string `json:"subscriber"` // Jobs is a list of jobConfigs for the ares-subscriber instance Jobs []*JobConfig `json:"jobs"` // AresClusters is a table of aresClusters for the ares-subscriber instance AresClusters map[string]config.SinkConfig `json:"instances"` } // NewJobConfigs creates JobConfigs object func NewJobConfigs(params Params) (Result, error) { jobConfigs := make(JobConfigs) err := AddLocalJobConfig(params.ServiceConfig, jobConfigs) return Result{ JobConfigs: jobConfigs, }, err } // NewAssignmentFromController parse controller assignment and create Assignment rule func NewAssignmentFromController(from *models.IngestionAssignment) (*Assignment, error) { assignment := &Assignment{ Subscriber: from.Subscriber, AresClusters: make(map[string]config.SinkConfig), } for _, job := range from.Jobs { jobConfig := &JobConfig{ JobConfig: job, } err := jobConfig.PopulateAresTableConfig() if err != nil { return nil, err } assignment.Jobs = append(assignment.Jobs, jobConfig) } if config.SinkIsAresDB { for instanceName, instance := range from.Instances { sinkConfig := config.SinkConfig{ SinkModeStr: "aresDB", AresDBConnectorConfig: client.ConnectorConfig{ Address: instance.Address, }, } assignment.AresClusters[instanceName] = sinkConfig } } return assignment, nil } // GetDestinations returns a job's destination definition func (j *JobConfig) GetDestinations() map[string]*DestinationConfig { return j.destinations } // GetTranformations returns a job's tranformation definition func (j *JobConfig) GetTranformations() map[string]*TransformationConfig { return j.transformations } // GetPrimaryKeys returns a job's primaryKeys definition func (j *JobConfig) GetPrimaryKeys() map[string]int { return j.primaryKeys } // SetPrimaryKeyBytes sets the number of bytes needed by primaryKey func (j *JobConfig) SetPrimaryKeyBytes(primaryKeyBytes int) { j.primaryKeyBytes = primaryKeyBytes } // AppendPrimaryKeyBytes returns the number of bytes needed by primaryKey func (j *JobConfig) GetPrimaryKeyBytes() int { return j.primaryKeyBytes } // GetColumnDict returns a job's columnDict definition func (j *JobConfig) GetColumnDict() map[string]int { return j.columnDict } // PopulateAresTableConfig populates information into jobConfig fields func (j *JobConfig) PopulateAresTableConfig() error { // set primaryKeys and primaryKeyBytes j.primaryKeyBytes = 0 j.primaryKeys = make(map[string]int, len(j.AresTableConfig.Table.PrimaryKeyColumns)) for _, pk := range j.AresTableConfig.Table.PrimaryKeyColumns { j.primaryKeys[j.AresTableConfig.Table.Columns[pk].Name] = pk columnType := j.AresTableConfig.Table.Columns[pk].Type dataBits := memCom.DataTypeBits(memCom.DataTypeFromString(columnType)) if dataBits < 8 { dataBits = 8 } j.primaryKeyBytes += dataBits / 8 } size := len(j.AresTableConfig.Table.Columns) j.destinations = make(map[string]*DestinationConfig, size) j.transformations = make(map[string]*TransformationConfig, size) j.columnDict = make(map[string]int, size) // set destinations and transformations for columnID, column := range j.AresTableConfig.Table.Columns { if column.Deleted { continue } j.columnDict[column.Name] = columnID updateMode := j.getUpdateMode(column.Name) j.destinations[column.Name] = &DestinationConfig{ Table: j.AresTableConfig.Table.Name, Column: column.Name, UpdateMode: updateMode, } var defaultValue string if column.DefaultValue != nil { defaultValue = *column.DefaultValue } j.transformations[column.Name] = &TransformationConfig{ Type: column.Type, Source: column.Name, Default: defaultValue, } } return nil } func (j *JobConfig) getUpdateMode(column string) memCom.ColumnUpdateMode { updateMode := memCom.UpdateOverwriteNotNull if _, ok := j.primaryKeys[column]; ok { updateMode = memCom.UpdateOverwriteNotNull } else if modeStr, ok := j.AresTableConfig.UpdateMode[column]; ok { updateMode = parseUpdateMode(modeStr) } return updateMode } // AddLocalJobConfig creates a list of jobConfigs from local configuration file func AddLocalJobConfig(serviceConfig config.ServiceConfig, jobConfigs JobConfigs) error { serviceConfig.Logger.Info("Start AddLocalJobConfig") // iterate all active jobs configured at local for _, jobName := range serviceConfig.ActiveJobs { serviceConfig.Logger.Info(fmt.Sprintf("Loading job: %s", jobName)) // set the job configure by loading its configuration file jobConfFile := fmt.Sprintf( "%s/%s/jobs/%s-%s.json", config.ConfigRootPath, serviceConfig.Environment.RuntimeEnvironment, jobName, serviceConfig.Environment.Zone) serviceConfig.Logger.Info(fmt.Sprintf("Loading job config file: %s", jobConfFile)) jobConf, err := ioutil.ReadFile(jobConfFile) if err != nil { if os.IsNotExist(err) { serviceConfig.Logger.Warn(fmt.Sprintf("No job config file: %s", jobConfFile)) continue } serviceConfig.Logger.Error( fmt.Sprintf("Failed to read job config file: %s", jobConfFile), zap.Error(err)) return err } jobConfig, err := newJobConfig(jobConf) if err != nil { serviceConfig.Logger.Error("Failed to load job config", zap.String("job", jobName), zap.Error(err)) return err } serviceConfig.Logger.Info(fmt.Sprintf("Loaded job config file: %s", jobConfFile)) // iterate all active ares cluster to set jobConfig for each of them jobAresConfig := make(JobAresConfig) for aresCluster := range serviceConfig.ActiveAresClusters { aresClusterJobConfig, err := CloneJobConfig(jobConfig, serviceConfig, aresCluster) if err != nil { serviceConfig.Logger.Error("Failed to copy job config", zap.String("job", jobName), zap.String("aresCluster", aresCluster), zap.Error(err)) return err } jobAresConfig[aresCluster] = aresClusterJobConfig } jobConfigs[jobName] = jobAresConfig serviceConfig.Logger.Info(fmt.Sprintf("Loaded job: %s", jobName)) } serviceConfig.Logger.Info("Done AddLocalJobConfig") return nil } // newJobConfig creates a jobConfig from json func newJobConfig(value []byte) (*JobConfig, error) { var jobConfig JobConfig err := json.Unmarshal(value, &jobConfig) if err != nil { return nil, err } err = jobConfig.PopulateAresTableConfig() return &jobConfig, err } // CloneJobConfig deep copy jobConfig func CloneJobConfig(src *JobConfig, serviceConfig config.ServiceConfig, aresCluster string) (*JobConfig, error) { serviceConfig.Logger.Info("Copying job config", zap.String("job", src.Name), zap.String("aresCluster", aresCluster)) dst := new(JobConfig) if err := deepcopy.Copy(dst, src); err != nil { return nil, err } // copy column defaultValue for i, column := range src.AresTableConfig.Table.Columns { if column.DefaultValue == nil { continue } defaultValue := *column.DefaultValue dst.AresTableConfig.Table.Columns[i].DefaultValue = &defaultValue } // copy destinations map dst.destinations = make(map[string]*DestinationConfig, len(src.destinations)) for k, v := range src.destinations { if v == nil { continue } dst.destinations[k] = new(DestinationConfig) if err := deepcopy.Copy(dst.destinations[k], v); err != nil { return nil, err } } // copy transformations map dst.transformations = make(map[string]*TransformationConfig, len(src.transformations)) for k, v := range src.transformations { if v == nil { continue } dst.transformations[k] = new(TransformationConfig) if err := deepcopy.Copy(dst.transformations[k], v); err != nil { return nil, err } dst.transformations[k].Context = make(map[string]string, len(v.Context)) for ck, ctx := range v.Context { dst.transformations[k].Context[ck] = ctx } } // copy primaryKeys map[string]interface{} dst.primaryKeys = make(map[string]int, len(src.primaryKeys)) for k, v := range src.primaryKeys { dst.primaryKeys[k] = v } dst.AresTableConfig.Cluster = aresCluster serviceConfig.Logger.Info("Copied job config", zap.String("job", src.Name), zap.String("aresCluster", aresCluster)) return dst, nil } // parseUpdateMode converts update mode string to memCom.ColumnUpdateMode func parseUpdateMode(modeStr string) memCom.ColumnUpdateMode { switch strings.ToLower(modeStr) { case "overwrite_notnull": return memCom.UpdateOverwriteNotNull case "overwrite_force": return memCom.UpdateForceOverwrite case "addition": return memCom.UpdateWithAddition case "min": return memCom.UpdateWithMin case "max": return memCom.UpdateWithMax default: return memCom.UpdateOverwriteNotNull } }