metastore/schema_fetch.go (188 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metastore
import (
"fmt"
controllerCli "github.com/uber/aresdb/controller/client"
controllerMutatorCom "github.com/uber/aresdb/controller/mutators/common"
memCom "github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
"reflect"
"time"
)
// SchemaFetchJob is a job that periodically pings ares-controller and updates table schemas if applicable
type SchemaFetchJob struct {
clusterName string
hash string
intervalInSeconds int
schemaMutator common.TableSchemaMutator
enumUpdater memCom.EnumUpdater
schemaValidator TableSchemaValidator
controllerClient controllerCli.ControllerClient
enumMutator controllerMutatorCom.EnumMutator
stopChan chan struct{}
}
// NewSchemaFetchJob creates a new SchemaFetchJob
func NewSchemaFetchJob(intervalInSeconds int, schemaMutator common.TableSchemaMutator, enumUpdater memCom.EnumUpdater, schemaValidator TableSchemaValidator, controllerClient controllerCli.ControllerClient, enumMutator controllerMutatorCom.EnumMutator, clusterName, initialHash string) *SchemaFetchJob {
return &SchemaFetchJob{
clusterName: clusterName,
hash: initialHash,
intervalInSeconds: intervalInSeconds,
schemaMutator: schemaMutator,
enumUpdater: enumUpdater,
schemaValidator: schemaValidator,
stopChan: make(chan struct{}),
controllerClient: controllerClient,
enumMutator: enumMutator,
}
}
// Run starts the scheduling
func (j *SchemaFetchJob) Run() {
tickChan := time.NewTicker(time.Second * time.Duration(j.intervalInSeconds)).C
for {
select {
case <-tickChan:
j.FetchSchema()
if j.enumUpdater != nil {
j.FetchEnum()
}
case <-j.stopChan:
return
}
}
}
// Stop stops the scheduling
func (j *SchemaFetchJob) Stop() {
close(j.stopChan)
}
func (j *SchemaFetchJob) FetchSchema() {
newHash, err := j.controllerClient.GetSchemaHash(j.clusterName)
if err != nil {
reportError(err, true, "hash")
return
}
if newHash != j.hash {
newSchemas, err := j.controllerClient.GetAllSchema(j.clusterName)
if err != nil {
reportError(err, true, "allSchema")
return
}
err = j.applySchemaChange(newSchemas)
if err != nil {
// errors already reported, just return without updating hash
return
}
j.hash = newHash
}
utils.GetLogger().Debug("Succeeded to run schema fetch job")
utils.GetRootReporter().GetCounter(utils.SchemaFetchSuccess).Inc(1)
}
func (j *SchemaFetchJob) applySchemaChange(tables []common.Table) (err error) {
oldTables, err := j.schemaMutator.ListTables()
if err != nil {
return
}
oldTablesMap := make(map[string]bool)
for _, oldTableName := range oldTables {
oldTablesMap[oldTableName] = true
}
for _, table := range tables {
if _, exist := oldTablesMap[table.Name]; !exist {
// found new table
err = j.schemaMutator.CreateTable(&table)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaCreationCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("added new table")
} else {
oldTablesMap[table.Name] = false
var oldTable *common.Table
oldTable, err = j.schemaMutator.GetTable(table.Name)
if err != nil {
reportError(err, true, table.Name)
continue
}
if oldTable.Incarnation < table.Incarnation {
// found new table incarnation, delete previous table and data
// then create new table
err := j.schemaMutator.DeleteTable(table.Name)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaDeletionCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("deleted table")
err = j.schemaMutator.CreateTable(&table)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaCreationCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("recreated table")
} else if oldTable.Incarnation == table.Incarnation && !reflect.DeepEqual(&table, oldTable) {
// found table update
j.schemaValidator.SetNewTable(table)
j.schemaValidator.SetOldTable(*oldTable)
err = j.schemaValidator.Validate()
if err != nil {
reportError(err, true, table.Name)
continue
}
err = j.schemaMutator.UpdateTable(table)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaUpdateCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("updated table")
}
}
}
for oldTableName, notAddressed := range oldTablesMap {
if notAddressed {
// found table deletion
err = j.schemaMutator.DeleteTable(oldTableName)
if err != nil {
reportError(err, true, oldTableName)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaDeletionCount).Inc(1)
}
}
return
}
// FetchEnum updates all enums
func (j *SchemaFetchJob) FetchEnum() {
var (
tableNames []string
table *common.Table
enumCases []string
err error
)
tableNames, err = j.schemaMutator.ListTables()
if err != nil {
reportError(err, false, "failed to get tables when fetching enums")
return
}
for _, tableName := range tableNames {
table, err = j.schemaMutator.GetTable(tableName)
if err != nil {
reportError(err, false, fmt.Sprintf("failed to get table %s when fetching enums", tableName))
continue
}
for _, column := range table.Columns {
if !column.IsEnumBasedColumn() {
continue
}
enumCases, err = j.enumMutator.GetEnumCases(j.clusterName, tableName, column.Name)
if err != nil {
reportError(err, false, fmt.Sprintf("failed to get enums, table %s column %s", tableName, column.Name))
continue
}
err = j.enumUpdater.UpdateEnum(tableName, column.Name, enumCases)
if err != nil {
reportError(err, false, fmt.Sprintf("failed to update enums, table %s column %s", tableName, column.Name))
continue
}
utils.GetLogger().Debugf("Succeeded to fetch enums. table %s, column %s", tableName, column.Name)
}
}
}
func reportError(err error, isSchemaError bool, extraInfo string) {
if isSchemaError {
utils.GetRootReporter().GetCounter(utils.SchemaFetchFailure).Inc(1)
} else {
utils.GetRootReporter().GetCounter(utils.SchemaFetchFailureEnum).Inc(1)
}
utils.GetLogger().With("extraInfo", extraInfo).Error(utils.StackError(err, "err running schema fetch job"))
}