memstore/schema.go (223 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
memCom "github.com/uber/aresdb/memstore/common"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
)
// FetchSchema fetches schema from metaStore and updates in-memory copy of table schema,
// and set up watch channels for metaStore schema changes, used for bootstrapping mem store.
func (m *memStoreImpl) FetchSchema() error {
tables, err := m.metaStore.ListTables()
if err != nil {
return utils.StackError(err, "Failed to list tables from meta")
}
for _, tableName := range tables {
err := m.fetchTable(tableName)
if err != nil {
return err
}
}
// watch table addition/modification
tableSchemaChangeEvents, done, err := m.metaStore.WatchTableSchemaEvents()
if err != nil {
return utils.StackError(err, "Failed to watch table list events")
}
go m.handleTableSchemaChange(tableSchemaChangeEvents, done)
// watch table deletion
tableListChangeEvents, done, err := m.metaStore.WatchTableListEvents()
if err != nil {
return utils.StackError(err, "Failed to watch table list events")
}
go m.handleTableListChange(tableListChangeEvents, done)
// watch enum cases appending
m.RLock()
for _, tableSchema := range m.TableSchemas {
for columnName, enumCases := range tableSchema.EnumDicts {
err := m.watchEnumCases(tableSchema.Schema.Name, columnName, len(enumCases.ReverseDict))
if err != nil {
return err
}
}
}
m.RUnlock()
return nil
}
func (m *memStoreImpl) fetchTable(tableName string) error {
table, err := m.metaStore.GetTable(tableName)
if err != nil {
if err != metaCom.ErrTableDoesNotExist {
return utils.StackError(err, "Failed to get table schema for table %s from meta", tableName)
}
} else {
tableSchema := memCom.NewTableSchema(table)
for columnID, column := range table.Columns {
if !column.Deleted {
if column.IsEnumBasedColumn() {
enumCases, err := m.metaStore.GetEnumDict(tableName, column.Name)
if err != nil {
if err != metaCom.ErrTableDoesNotExist && err != metaCom.ErrColumnDoesNotExist {
return utils.StackError(err, "Failed to fetch enum cases for table: %s, column: %s", tableName, column.Name)
}
} else {
tableSchema.CreateEnumDict(column.Name, enumCases)
}
}
}
tableSchema.SetDefaultValue(columnID)
}
m.Lock()
m.TableSchemas[tableName] = tableSchema
m.Unlock()
}
return nil
}
// watch enumCases will setup watch channels for each enum column.
func (m *memStoreImpl) watchEnumCases(tableName, columnName string, startCase int) error {
enumDictChangeEvents, done, err := m.metaStore.WatchEnumDictEvents(tableName, columnName, startCase)
if err != nil {
if err != metaCom.ErrTableDoesNotExist && err != metaCom.ErrColumnDoesNotExist {
return utils.StackError(err, "Failed to watch enum case events")
}
} else {
go m.handleEnumDictChange(tableName, columnName, enumDictChangeEvents, done)
}
return nil
}
// handleTableListChange handles table deletion events from metaStore.
func (m *memStoreImpl) handleTableListChange(tableListChangeEvents <-chan []string, done chan<- struct{}) {
for newTableList := range tableListChangeEvents {
m.applyTableList(newTableList)
done <- struct{}{}
}
close(done)
}
func (m *memStoreImpl) applyTableList(newTableList []string) {
m.Lock()
for tableName, tableSchema := range m.TableSchemas {
if utils.IndexOfStr(newTableList, tableName) < 0 {
// detach shards and schema from map
// to prevent new usage
tableShards := m.TableShards[tableName]
delete(m.TableSchemas, tableName)
delete(m.TableShards, tableName)
// only one table deletion at a time
m.Unlock()
for shardID, shard := range tableShards {
shard.Destruct()
m.diskStore.DeleteTableShard(tableName, shardID)
}
m.scheduler.DeleteTable(tableName, tableSchema.Schema.IsFactTable)
return
}
}
m.Unlock()
}
// handleTableSchemaChange handles table schema change event from metaStore including new table schema.
func (m *memStoreImpl) handleTableSchemaChange(tableSchemaChangeEvents <-chan *metaCom.Table, done chan<- struct{}) {
for table := range tableSchemaChangeEvents {
m.applyTableSchema(table)
done <- struct{}{}
}
close(done)
}
func (m *memStoreImpl) applyTableSchema(newTable *metaCom.Table) {
tableName := newTable.Name
var newEnumColumns []string
// default start watching from first enumCase
startEnumID := 0
defer func() {
for _, column := range newEnumColumns {
err := m.watchEnumCases(tableName, column, startEnumID)
if err != nil {
utils.GetLogger().With(
"error", err.Error(),
"table", tableName,
"column", column).
Panic("Failed to watch enum dict events")
}
}
}()
m.Lock()
tableSchema, tableExist := m.TableSchemas[tableName]
// new table
if !tableExist {
tableSchema = memCom.NewTableSchema(newTable)
for columnID, column := range newTable.Columns {
if !column.Deleted {
if column.IsEnumBasedColumn() {
var enumCases []string
if column.DefaultValue != nil {
enumCases = append(enumCases, *column.DefaultValue)
// default value is already appended, start watching from 1
startEnumID = 1
}
tableSchema.CreateEnumDict(column.Name, enumCases)
newEnumColumns = append(newEnumColumns, column.Name)
}
}
tableSchema.SetDefaultValue(columnID)
}
m.TableSchemas[newTable.Name] = tableSchema
m.Unlock()
return
}
m.Unlock()
var columnsToDelete []int
tableSchema.Lock()
oldColumns := tableSchema.Schema.Columns
tableSchema.SetTable(newTable)
for columnID, column := range newTable.Columns {
if column.Deleted {
tableSchema.SetDefaultValue(columnID)
if columnID < len(oldColumns) && !oldColumns[columnID].Deleted { // new deletions only
delete(tableSchema.EnumDicts, column.Name)
columnsToDelete = append(columnsToDelete, columnID)
}
} else {
if column.IsEnumBasedColumn() {
_, exist := tableSchema.EnumDicts[column.Name]
if !exist {
var enumCases []string
if column.DefaultValue != nil {
enumCases = append(enumCases, *column.DefaultValue)
// default value is already appended, start watching from 1
startEnumID = 1
}
tableSchema.CreateEnumDict(column.Name, enumCases)
newEnumColumns = append(newEnumColumns, column.Name)
}
}
// always set default value after enum map creation
tableSchema.SetDefaultValue(columnID)
var oldPreloadingDays int
newPreloadingDays := column.Config.PreloadingDays
// preloading will be triggered if
// 1. this is a new column and PreloadingDays > 0
// 2. this is a old column and PreloadingDays > oldPreloadingDays
if columnID < len(oldColumns) {
oldPreloadingDays = oldColumns[columnID].Config.PreloadingDays
}
m.HostMemManager.TriggerPreload(tableName, columnID, oldPreloadingDays, newPreloadingDays)
}
}
tableSchema.Unlock()
for _, columnID := range columnsToDelete {
var shards []*TableShard
m.RLock()
for _, shard := range m.TableShards[tableName] {
shard.Users.Add(1)
shards = append(shards, shard)
}
m.RUnlock()
for _, shard := range shards {
// May block for extended amount of time during archiving
shard.DeleteColumn(columnID)
shard.Users.Done()
}
}
}
// handleEnumDictChange handles enum dict change event from metaStore for specific table and column.
func (m *memStoreImpl) handleEnumDictChange(tableName, columnName string, enumDictChangeEvents <-chan string, done chan<- struct{}) {
for newEnumCase := range enumDictChangeEvents {
m.applyEnumCase(tableName, columnName, newEnumCase)
}
close(done)
}
func (m *memStoreImpl) applyEnumCase(tableName, columnName string, newEnumCase string) {
m.RLock()
tableSchema, tableExist := m.TableSchemas[tableName]
if !tableExist {
m.RUnlock()
return
}
tableSchema.Lock()
m.RUnlock()
enumDict, columnExist := tableSchema.EnumDicts[columnName]
if !columnExist {
tableSchema.Unlock()
return
}
enumDict.Dict[newEnumCase] = len(enumDict.ReverseDict)
enumDict.ReverseDict = append(enumDict.ReverseDict, newEnumCase)
tableSchema.EnumDicts[columnName] = enumDict
tableSchema.Unlock()
}