metastore/disk_metastore.go (1,092 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metastore
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
"math"
)
// meaningful defaults of table configurations.
const (
DefaultBatchSize = 2097152
DefaultArchivingDelayMinutes = 1440
DefaultArchivingIntervalMinutes = 180
DefaultBackfillIntervalMinutes = 60
DefaultBackfillMaxBufferSize int64 = 4294967296
DefaultBackfillThresholdInBytes int64 = 2097152
DefaultBackfillStoreBatchSize = 20000
DefaultRecordRetentionInDays = 90
DefaultSnapshotIntervalMinutes = 360 // 6 hours
DefaultSnapshotThreshold = 3 * DefaultBatchSize // 3 batches
DefaultRedologRotationInterval = 10800 // 3 hours
DefaultMaxRedoLogSize = 1 << 30 // 1 GB
)
// DefaultTableConfig represents default table config
var DefaultTableConfig = common.TableConfig{
BatchSize: DefaultBatchSize,
ArchivingIntervalMinutes: DefaultArchivingIntervalMinutes,
ArchivingDelayMinutes: DefaultArchivingDelayMinutes,
BackfillMaxBufferSize: DefaultBackfillMaxBufferSize,
BackfillIntervalMinutes: DefaultBackfillIntervalMinutes,
BackfillThresholdInBytes: DefaultBackfillThresholdInBytes,
BackfillStoreBatchSize: DefaultBackfillStoreBatchSize,
RecordRetentionInDays: DefaultRecordRetentionInDays,
SnapshotIntervalMinutes: DefaultSnapshotIntervalMinutes,
SnapshotThreshold: DefaultSnapshotThreshold,
RedoLogRotationInterval: DefaultRedologRotationInterval,
MaxRedoLogFileSize: DefaultMaxRedoLogSize,
}
// disk-based metastore implementation.
// all validation of user input (eg. table/column name and table/column struct) will be pushed to api layer,
// which is the earliest point of user input, all schemas inside system will be already valid,
// Note:
// There are four types of write calls to MetaStore, the handling of each is different:
// 1. Schema Changes
// synchronous, return after both writing to watcher channel and reading from done channel are done
// 2. Update EnumCases
// return after changes persisted in disk and writing to watcher channel; does not read from done channel
// 3. Adding Watchers
// 3.1 for enum cases, create channels and push existing enum cases starting from start case to channel if any
// 3.2 for table list and table schema channels, create channels and return
// 4. Update configurations
// configurations update including updates on archiving cutoff, snapshot version, archive batch version etc,
// these changes does not need to be pushed to memstore.
// Operations involves writing to watcher channels (case 1 and 2), we need to enforce the order of changes pushed into channel,
// writeLock is introduced to enforce that.
// Other operations (case 3 and 4), we only need lock to protect internal data structure, a read write lock is used.
type diskMetaStore struct {
sync.RWMutex
utils.FileSystem
// writeLock is to enforce single writer at a time
// to make sure the same order of shema change when applied to
// MemStore through watcher channel
writeLock sync.Mutex
// the base path for MetaStore in disk
basePath string
// tableListWatcher
tableListWatcher chan<- []string
// tableListDone is the channel for tracking whether watcher has
// successfully got the table list change,
// here we adopt a synchronous model for schema change.
tableListDone <-chan struct{}
// tableSchemaWatcher
tableSchemaWatcher chan<- *common.Table
// tableSchemaDone is the channel for tracking whether watcher has
// successfully got the table schema change
tableSchemaDone <-chan struct{}
// enumDictWatchers
// maps from tableName to columnName to watchers
enumDictWatchers map[string]map[string]chan<- string
// tableSchemaDone are the channels for tracking whether watcher has
// successfully got the enum case change.
enumDictDone map[string]map[string]<-chan struct{}
// shardOwnershipWatcher
shardOwnershipWatcher chan<- common.ShardOwnership
// shardOwnershipDone is used for block waiting for the consumer to finish
// processing each ownership change event.
shardOwnershipDone <-chan struct{}
}
// ListTables list existing table names
func (dm *diskMetaStore) ListTables() ([]string, error) {
return dm.listTables()
}
// GetTable return the table schema stored in metastore given tablename,
// return ErrTableDoesNotExist if table not exists.
func (dm *diskMetaStore) GetTable(name string) (*common.Table, error) {
dm.RLock()
defer dm.RUnlock()
err := dm.tableExists(name)
if err != nil {
return nil, err
}
return dm.readSchemaFile(name)
}
// GetEnumDict gets the enum cases for given tableName and columnName
func (dm *diskMetaStore) GetEnumDict(tableName, columnName string) ([]string, error) {
dm.RLock()
defer dm.RUnlock()
if err := dm.enumColumnExists(tableName, columnName); err != nil {
return nil, err
}
return dm.readEnumFile(tableName, columnName)
}
// GetArchivingCutoff gets the latest archiving cutoff for given table and shard.
func (dm *diskMetaStore) GetArchivingCutoff(tableName string, shard int) (uint32, error) {
dm.RLock()
defer dm.RUnlock()
err := dm.tableExists(tableName)
if err != nil {
return 0, err
}
file := dm.getShardVersionFilePath(tableName, shard)
return dm.readVersion(file)
}
// DeleteTableShard deletes all table shard level metadata
func (dm *diskMetaStore) DeleteTableShard(tableName string, shard int) error {
dm.Lock()
defer dm.Unlock()
shardDirPath := dm.getShardDirPath(tableName, shard)
return dm.RemoveAll(shardDirPath)
}
// GetSnapshotProgress gets the latest snapshot progress for given table and shard
func (dm *diskMetaStore) GetSnapshotProgress(tableName string, shard int) (int64, uint32, int32, uint32, error) {
dm.RLock()
defer dm.RUnlock()
if err := dm.tableExists(tableName); err != nil {
return 0, 0, 0, 0, err
}
file := dm.getSnapshotRedoLogVersionAndOffsetFilePath(tableName, shard)
return dm.readSnapshotRedoLogFileAndOffset(file)
}
// UpdateArchivingCutoff updates archiving cutoff for given table (fact table), shard
func (dm *diskMetaStore) UpdateArchivingCutoff(tableName string, shard int, cutoff uint32) error {
dm.Lock()
defer dm.Unlock()
if err := dm.tableExists(tableName); err != nil {
return err
}
schema, err := dm.readSchemaFile(tableName)
if err != nil {
return err
}
if !schema.IsFactTable {
return common.ErrNotFactTable
}
file := dm.getShardVersionFilePath(tableName, shard)
return dm.writeArchivingCutoff(file, cutoff)
}
// UpdateSnapshotProgress update snapshot version for given table (dimension table), shard.
func (dm *diskMetaStore) UpdateSnapshotProgress(tableName string, shard int, redoLogFile int64, upsertBatchOffset uint32, lastReadBatchID int32, lastReadBatchOffset uint32) error {
dm.Lock()
defer dm.Unlock()
if err := dm.tableExists(tableName); err != nil {
return err
}
schema, err := dm.readSchemaFile(tableName)
if err != nil {
return err
}
if schema.IsFactTable {
return common.ErrNotDimensionTable
}
file := dm.getSnapshotRedoLogVersionAndOffsetFilePath(tableName, shard)
return dm.writeSnapshotRedoLogVersionAndOffset(file, redoLogFile, upsertBatchOffset, lastReadBatchID, lastReadBatchOffset)
}
// Updates the latest redolog/offset that have been backfilled for the specified shard.
func (dm *diskMetaStore) UpdateBackfillProgress(table string, shard int, redoFile int64, offset uint32) error {
utils.GetLogger().Debugf("Backfill checkpoint(table=%s shard=%d redoFile=%d offset=%d)", table, shard, redoFile, offset)
dm.Lock()
defer dm.Unlock()
if err := dm.tableExists(table); err != nil {
return err
}
schema, err := dm.readSchemaFile(table)
if err != nil {
return err
}
if !schema.IsFactTable {
return common.ErrNotFactTable
}
file := dm.getRedoLogVersionAndOffsetFilePath(table, shard)
return dm.writeRedoLogVersionAndOffset(file, redoFile, offset)
}
// Retrieve the latest redolog/offset that have been backfilled for the specified shard.
func (dm *diskMetaStore) GetBackfillProgressInfo(table string, shard int) (int64, uint32, error) {
dm.RLock()
defer dm.RUnlock()
if err := dm.tableExists(table); err != nil {
return 0, 0, err
}
file := dm.getRedoLogVersionAndOffsetFilePath(table, shard)
return dm.readRedoLogFileAndOffset(file)
}
// Update ingestion commit offset, used for kafka like streaming ingestion
func (dm *diskMetaStore) UpdateRedoLogCommitOffset(table string, shard int, offset int64) error {
dm.Lock()
defer dm.Unlock()
// No sanity check here for schema/fact table/directory, assuming all should be passed before calling this func
file := dm.getIngestionCommitOffsetFilePath(table, shard)
err := dm.MkdirAll(filepath.Dir(file), 0755)
if err != nil {
return utils.StackError(err, "Failed to create directory for redolog commit offset")
}
writer, err := dm.OpenFileForWrite(
file,
os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
0644,
)
if err != nil {
return utils.StackError(err, "Failed to open ingestion commit offset file %s for write", file)
}
defer writer.Close()
_, err = io.WriteString(writer, fmt.Sprintf("%d", offset))
return err
}
// Get ingestion commit offset, used for kafka like streaming ingestion
func (dm *diskMetaStore) GetRedoLogCommitOffset(table string, shard int) (int64, error) {
dm.RLock()
defer dm.RUnlock()
// No sanity check here for schema/fact table/directory, assuming all should be passed before calling this func
file := dm.getIngestionCommitOffsetFilePath(table, shard)
fileBytes, err := dm.ReadFile(file)
if os.IsNotExist(err) {
return 0, nil
}
if err != nil {
return 0, utils.StackError(err, "Failed to open ingestion commit offset file %s", file)
}
var offset int64
_, err = fmt.Fscanln(bytes.NewBuffer(fileBytes), &offset)
if err != nil {
return 0, utils.StackError(err, "Failed to read ingestion commit offset file %s", file)
}
return offset, nil
}
// Update ingestion checkpoint offset, used for kafka like streaming ingestion
func (dm *diskMetaStore) UpdateRedoLogCheckpointOffset(table string, shard int, offset int64) error {
dm.Lock()
defer dm.Unlock()
// No sanity check here for schema/fact table/directory, assuming all should be passed before calling this func
file := dm.getIngestionCheckpointOffsetFilePath(table, shard)
err := dm.MkdirAll(filepath.Dir(file), 0755)
if err != nil {
return utils.StackError(err, "Failed to create directory for redolog checkpoint offset")
}
writer, err := dm.OpenFileForWrite(
file,
os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
0644,
)
if err != nil {
return utils.StackError(err, "Failed to open ingestion checkpoint offset file %s for write", file)
}
defer writer.Close()
_, err = io.WriteString(writer, fmt.Sprintf("%d", offset))
return err
}
// Get ingestion checkpoint offset, used for kafka like streaming ingestion
func (dm *diskMetaStore) GetRedoLogCheckpointOffset(table string, shard int) (int64, error) {
dm.RLock()
defer dm.RUnlock()
// No sanity check here for schema/fact table/directory, assuming all should be passed before calling this func
file := dm.getIngestionCheckpointOffsetFilePath(table, shard)
fileBytes, err := dm.ReadFile(file)
if os.IsNotExist(err) {
return 0, nil
}
if err != nil {
return 0, utils.StackError(err, "Failed to open ingestion checkpoint offset file %s", file)
}
var offset int64
_, err = fmt.Fscanln(bytes.NewBuffer(fileBytes), &offset)
if err != nil {
return 0, utils.StackError(err, "Failed to read ingestion checkpoint offset file %s", file)
}
return offset, nil
}
// WatchTableListEvents register a watcher to table list change events,
// should only be called once,
// returns ErrWatcherAlreadyExist once watcher already exists
func (dm *diskMetaStore) WatchTableListEvents() (events <-chan []string, done chan<- struct{}, err error) {
dm.Lock()
defer dm.Unlock()
if dm.tableListWatcher != nil {
return nil, nil, common.ErrWatcherAlreadyExist
}
watcherChan, doneChan := make(chan []string), make(chan struct{})
dm.tableListWatcher, dm.tableListDone = watcherChan, doneChan
return watcherChan, doneChan, nil
}
// WatchTableSchemaEvents register a watcher to table schema change events,
// should be only called once,
// returns ErrWatcherAlreadyExist once watcher already exists
func (dm *diskMetaStore) WatchTableSchemaEvents() (events <-chan *common.Table, done chan<- struct{}, err error) {
dm.Lock()
defer dm.Unlock()
if dm.tableSchemaWatcher != nil {
return nil, nil, common.ErrWatcherAlreadyExist
}
watcherChan, doneChan := make(chan *common.Table), make(chan struct{})
dm.tableSchemaWatcher, dm.tableSchemaDone = watcherChan, doneChan
return watcherChan, doneChan, nil
}
// WatchEnumDictEvents register a watcher to enum cases change events for given table and column,
// returns
// ErrTableDoesNotExist, ErrColumnDoesNotExist, ErrNotEnumColumn, ErrWatcherAlreadyExist.
// if startCase is larger than the number of current existing enum cases, it will be just as if receiving from
// latest.
func (dm *diskMetaStore) WatchEnumDictEvents(table, column string, startCase int) (events <-chan string, done chan<- struct{}, err error) {
dm.Lock()
defer dm.Unlock()
if err = dm.enumColumnExists(table, column); err != nil {
return nil, nil, err
}
if _, exist := dm.enumDictWatchers[table]; !exist {
dm.enumDictWatchers[table] = make(map[string]chan<- string)
dm.enumDictDone[table] = make(map[string]<-chan struct{})
}
if _, exist := dm.enumDictWatchers[table][column]; exist {
return nil, nil, common.ErrWatcherAlreadyExist
}
existingEnumCases, err := dm.readEnumFile(table, column)
if err != nil {
return nil, nil, err
}
channelCapacity := len(existingEnumCases) - startCase
// if start is larger than length of existing enum cases
// will treat as if sending from latest
if channelCapacity <= 0 {
channelCapacity = 1
}
watcherChan, doneChan := make(chan string, channelCapacity), make(chan struct{})
dm.enumDictWatchers[table][column], dm.enumDictDone[table][column] = watcherChan, doneChan
for start := startCase; start < len(existingEnumCases); start++ {
watcherChan <- existingEnumCases[start]
}
return watcherChan, doneChan, nil
}
// WatchTableSchemaEvents register a watcher to table schema change events,
// should be only called once,
// returns ErrWatcherAlreadyExist once watcher already exists
func (dm *diskMetaStore) WatchShardOwnershipEvents() (events <-chan common.ShardOwnership, done chan<- struct{}, err error) {
dm.Lock()
defer dm.Unlock()
if dm.shardOwnershipWatcher != nil {
return nil, nil, common.ErrWatcherAlreadyExist
}
watcherChan, doneChan := make(chan common.ShardOwnership), make(chan struct{})
dm.shardOwnershipWatcher, dm.shardOwnershipDone = watcherChan, doneChan
return watcherChan, doneChan, nil
}
// CreateTable creates a new Table,
// returns
// ErrTableAlreadyExist if table already exists
func (dm *diskMetaStore) CreateTable(table *common.Table) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var existingTables []string
dm.Lock()
defer func() {
dm.Unlock()
if err == nil {
dm.pushSchemaChange(table)
dm.pushShardOwnershipChange(table.Name)
}
}()
existingTables, err = dm.listTables()
if err != nil {
return err
}
if utils.IndexOfStr(existingTables, table.Name) >= 0 {
return common.ErrTableAlreadyExist
}
validator := NewTableSchameValidator()
validator.SetNewTable(*table)
err = validator.Validate()
if err != nil {
return err
}
if err = dm.MkdirAll(dm.getTableDirPath(table.Name), 0755); err != nil {
return err
}
if err = dm.writeSchemaFile(table); err != nil {
return err
}
// append enum case for enum column with default value
for _, column := range table.Columns {
if column.DefaultValue != nil && column.IsEnumBasedColumn() {
err = dm.writeEnumFile(table.Name, column.Name, []string{*column.DefaultValue})
if err != nil {
return err
}
}
}
return nil
}
// UpdateTable update table configurations
// return
// ErrTableDoesNotExist if table does not exist
func (dm *diskMetaStore) UpdateTableConfig(tableName string, config common.TableConfig) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var table *common.Table
dm.Lock()
defer func() {
dm.Unlock()
if err == nil && table != nil {
dm.pushSchemaChange(table)
}
}()
if err = dm.tableExists(tableName); err != nil {
return err
}
table, err = dm.readSchemaFile(tableName)
if err != nil {
return err
}
table.Config = config
return dm.writeSchemaFile(table)
}
// UpdateTable updates table schema and config
// table passed in should have been validated against existing table schema
func (dm *diskMetaStore) UpdateTable(table common.Table) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
dm.Lock()
defer func() {
dm.Unlock()
if err == nil {
dm.pushSchemaChange(&table)
}
}()
var existingTable *common.Table
existingTable, err = dm.readSchemaFile(table.Name)
if err != nil {
return
}
if err = dm.writeSchemaFile(&table); err != nil {
return err
}
// append enum case for enum column with default value for new columns
for i := len(existingTable.Columns); i < len(table.Columns); i++ {
column := table.Columns[i]
if column.DefaultValue != nil && column.IsEnumBasedColumn() {
err = dm.writeEnumFile(table.Name, column.Name, []string{*column.DefaultValue})
if err != nil {
return
}
}
}
return
}
// DeleteTable deletes a table
// return
// ErrTableDoesNotExist if table does not exist
func (dm *diskMetaStore) DeleteTable(tableName string) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var existingTables []string
dm.Lock()
defer func() {
dm.Unlock()
if dm.tableListWatcher != nil {
dm.tableListWatcher <- existingTables
<-dm.tableListDone
}
}()
existingTables, err = dm.listTables()
if err != nil {
return utils.StackError(err, "Failed to list tables")
}
index := utils.IndexOfStr(existingTables, tableName)
if index < 0 {
return common.ErrTableDoesNotExist
}
if err = dm.removeTable(tableName); err != nil {
return err
}
existingTables = append(existingTables[:index], existingTables[index+1:]...)
return nil
}
// AddColumn adds a new column
// returns
// ErrTableDoesNotExist if table does not exist
// ErrColumnAlreadyExist if column already exists
func (dm *diskMetaStore) AddColumn(tableName string, column common.Column, appendToArchivingSortOrder bool) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var table *common.Table
dm.Lock()
defer func() {
dm.Unlock()
if err == nil {
dm.pushSchemaChange(table)
}
}()
if err = dm.tableExists(tableName); err != nil {
return err
}
if table, err = dm.readSchemaFile(tableName); err != nil {
return err
}
return dm.addColumn(table, column, appendToArchivingSortOrder)
}
// UpdateColumn deletes a column.
// return
// ErrTableDoesNotExist if table does not exist.
// ErrColumnDoesNotExist if column does not exist.
func (dm *diskMetaStore) UpdateColumn(tableName string, columnName string, config common.ColumnConfig) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var table *common.Table
dm.Lock()
defer func() {
dm.Unlock()
if err == nil {
dm.pushSchemaChange(table)
}
}()
if err = dm.tableExists(tableName); err != nil {
return err
}
if table, err = dm.readSchemaFile(tableName); err != nil {
return err
}
return dm.updateColumn(table, columnName, config)
}
// DeleteColumn deletes a column
// return
// ErrTableDoesNotExist if table not exist
// ErrColumnDoesNotExist if column not exist
func (dm *diskMetaStore) DeleteColumn(tableName string, columnName string) (err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var table *common.Table
dm.Lock()
defer func() {
dm.Unlock()
if err == nil {
dm.pushSchemaChange(table)
}
}()
if err = dm.tableExists(tableName); err != nil {
return err
}
if table, err = dm.readSchemaFile(tableName); err != nil {
return err
}
return dm.removeColumn(table, columnName)
}
// ExtendEnumDict extends enum cases for given table column
func (dm *diskMetaStore) ExtendEnumDict(table, columnName string, enumCases []string) (enumIDs []int, err error) {
dm.writeLock.Lock()
defer dm.writeLock.Unlock()
var existingCases []string
newEnumCases := make([]string, 0, len(enumCases))
dm.Lock()
defer func() {
dm.Unlock()
if err == nil {
if _, tableExist := dm.enumDictWatchers[table]; tableExist {
if watcher, columnExist := dm.enumDictWatchers[table][columnName]; columnExist {
for _, enumCase := range newEnumCases {
watcher <- enumCase
}
}
}
}
}()
var column *common.Column
column, err = dm.getColumnByName(table, columnName)
if err != nil {
return
}
if !column.IsEnumBasedColumn() {
err = common.ErrNotEnumColumn
return
}
existingCases, err = dm.readEnumFile(table, columnName)
if err != nil {
return nil, err
}
enumDict := make(map[string]int)
for enumID, enumCase := range existingCases {
enumDict[enumCase] = enumID
}
newEnumID := len(existingCases)
enumCardinalityLimit := common.EnumCardinality(column.Type)
if newEnumID+len(enumCases) > enumCardinalityLimit {
err = common.ErrEnumCardinalityOverflow
return
}
enumIDs = make([]int, len(enumCases))
for index, newCase := range enumCases {
if enumID, exist := enumDict[newCase]; exist {
enumIDs[index] = enumID
} else {
enumDict[newCase] = newEnumID
newEnumCases = append(newEnumCases, newCase)
enumIDs[index] = newEnumID
newEnumID++
}
}
if err = dm.writeEnumFile(table, columnName, newEnumCases); err != nil {
return nil, err
}
utils.GetRootReporter().GetChildGauge(map[string]string{
"table": table,
"columnName": columnName,
}, utils.NumberOfEnumCasesPerColumn).Update(float64(newEnumID))
return enumIDs, nil
}
// PurgeArchiveBatches deletes the archive batches' metadata with batchID within [batchIDStart, batchIDEnd)
func (dm *diskMetaStore) PurgeArchiveBatches(tableName string, shard, batchIDStart, batchIDEnd int) error {
dm.Lock()
defer dm.Unlock()
if err := dm.tableExists(tableName); err != nil {
return err
}
batchFiles, err := dm.ReadDir(dm.getArchiveBatchDirPath(tableName, shard))
if os.IsNotExist(err) {
utils.GetLogger().Warnf("table %s shard %d does not exist", tableName, shard)
return nil
} else if err != nil {
return utils.StackError(err, "failed to read batch dir, table: %s, shard: %d", tableName, shard)
}
for _, batchFile := range batchFiles {
batchID, err := strconv.ParseInt(batchFile.Name(), 10, 32)
if err != nil {
return err
}
if batchID < int64(batchIDEnd) && batchID >= int64(batchIDStart) {
path := dm.getArchiveBatchVersionFilePath(tableName, shard, int(batchID))
err := dm.Remove(path)
if os.IsNotExist(err) {
utils.GetLogger().Warnf("batch %d of table %s, shard %d does not exist", batchID, tableName, shard)
} else if err != nil {
return utils.StackError(err, "failed to delete metadata, table: %s, shard: %d, batch: %d", tableName, shard, batchID)
}
}
}
return nil
}
// OverwriteArchiveBatchVersion overwrites batch version
func (dm *diskMetaStore) OverwriteArchiveBatchVersion(tableName string, shard, batchID int, version uint32, seqNum uint32, batchSize int) error {
return dm.writeArchiveBatchVersionWithMode(tableName, shard, batchID, version, seqNum, batchSize, os.O_WRONLY|os.O_TRUNC|os.O_CREATE)
}
// AddArchiveBatchVersion adds a new version to archive batch.
func (dm *diskMetaStore) AddArchiveBatchVersion(tableName string, shard, batchID int, version uint32, seqNum uint32, batchSize int) error {
return dm.writeArchiveBatchVersionWithMode(tableName, shard, batchID, version, seqNum, batchSize, os.O_WRONLY|os.O_APPEND|os.O_CREATE)
}
func (dm *diskMetaStore) writeArchiveBatchVersionWithMode(tableName string, shard, batchID int, version uint32, seqNum uint32, batchSize int, mode int) error {
dm.Lock()
defer dm.Unlock()
if err := dm.tableExists(tableName); err != nil {
return err
}
path := dm.getArchiveBatchVersionFilePath(tableName, shard, batchID)
if err := dm.MkdirAll(filepath.Dir(path), 0755); err != nil {
return utils.StackError(err, "Failed to create archive batch version directory")
}
writer, err := dm.OpenFileForWrite(
path,
mode,
0644,
)
if err != nil {
return utils.StackError(
err,
"Failed to open archive batch version file, table: %s, shard: %d, batch: %d",
tableName,
shard,
batchID,
)
}
defer writer.Close()
if seqNum > 0 {
_, err = io.WriteString(writer, fmt.Sprintf("%d-%d,%d\n", version, seqNum, batchSize))
} else {
_, err = io.WriteString(writer, fmt.Sprintf("%d,%d\n", version, batchSize))
}
if err != nil {
return utils.StackError(err, "Failed to write to batch version file, table: %s, shard: %d, batch: %d",
tableName,
shard,
batchID,
)
}
return nil
}
func (dm *diskMetaStore) GetArchiveBatches(table string, shard int, batchIDStart, batchIDEnd int32) ([]int, error) {
dm.RLock()
defer dm.RUnlock()
batchFiles, err := dm.ReadDir(dm.getArchiveBatchDirPath(table, shard))
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("fail to read archive batch directories: %v", err)
}
if batchIDEnd <= 0 {
batchIDEnd = math.MaxInt32
}
batchIDs := []int{}
for _, batchFile := range batchFiles {
batchID, err := strconv.ParseInt(batchFile.Name(), 10, 32)
if err != nil {
// we'll skip invalid batchID
continue
}
if batchID <= int64(batchIDEnd) && batchID >= int64(batchIDStart) {
batchIDs = append(batchIDs, int(batchID))
}
}
return batchIDs, nil
}
// GetArchiveBatchVersion gets the latest version <= given archiving/live cutoff
// all cutoff and batch versions are sorted in file per batch
// sample:
// /root_path/metastore/{$table}/shards/{$shard_id}/batches/{$batch_id}
// version,size
// 1-0,10
// 2-0,20
// 2-1,26
// 4-0,20
// 5-0,20
// 5-1,25
// 5-2,38
// if given cutoff 6, returns 5-2,38
// if given cutoff 4, returns 4-0,20
// if given cutoff 0, returns 0-0, 0
func (dm *diskMetaStore) GetArchiveBatchVersion(table string, shard, batchID int, cutoff uint32) (uint32, uint32, int, error) {
dm.RLock()
defer dm.RUnlock()
if err := dm.tableExists(table); err != nil {
return 0, 0, 0, err
}
batchVersionBytes, err := dm.ReadFile(dm.getArchiveBatchVersionFilePath(table, shard, batchID))
if os.IsNotExist(err) {
return 0, 0, 0, nil
} else if err != nil {
return 0, 0, 0, utils.StackError(err, "Failed to read batch")
}
batchVersionSizes := strings.Split(strings.TrimSuffix(string(batchVersionBytes), "\n"), "\n")
var version uint64
// do binary search to find the first cutoff that is larger than the specified cutoff
firstIndex := sort.Search(len(batchVersionSizes), func(i int) bool {
versionSizePair := strings.Split(batchVersionSizes[i], ",")
// backward compatible: sequence number may not exist for old version
if !strings.Contains(versionSizePair[0], "-") {
version, err = strconv.ParseUint(versionSizePair[0], 10, 32)
} else {
versionSeqStr := strings.Split(versionSizePair[0], "-")
version, err = strconv.ParseUint(versionSeqStr[0], 10, 32)
}
if err != nil {
// this should never happen
utils.GetLogger().With(
"error", err.Error(),
"table", table,
"shard", shard,
"batchID", batchID).
Panic("Incorrect batch version")
}
return uint32(version) > cutoff
})
// all cutoffs larger than given cutoff
if firstIndex == 0 {
return 0, 0, 0, nil
}
versionSizePair := strings.Split(batchVersionSizes[firstIndex-1], ",")
if len(versionSizePair) != 2 {
return 0, 0, 0, utils.StackError(err, "Incorrect batch version and size pair, %s", batchVersionSizes[firstIndex-1])
}
var seqNum uint64
if !strings.Contains(versionSizePair[0], "-") {
version, err = strconv.ParseUint(versionSizePair[0], 10, 32)
seqNum = 0
} else {
versionSeqStr := strings.Split(versionSizePair[0], "-")
seqNum, err = strconv.ParseUint(versionSeqStr[1], 10, 32)
if err != nil {
return 0, 0, 0, utils.StackError(err, "Failed to parse batch sequence, %s", versionSizePair[0])
}
version, err = strconv.ParseUint(versionSeqStr[0], 10, 32)
}
if err != nil {
return 0, 0, 0, utils.StackError(err, "Failed to parse batchVersion, %s", versionSizePair[0])
}
batchSize, err := strconv.ParseInt(versionSizePair[1], 10, 32)
if err != nil {
return 0, 0, 0, utils.StackError(err, "Failed to parse batchSize, %s", versionSizePair[1])
}
return uint32(version), uint32(seqNum), int(batchSize), nil
}
func (dm *diskMetaStore) pushSchemaChange(table *common.Table) {
if dm.tableSchemaWatcher != nil {
dm.tableSchemaWatcher <- table
<-dm.tableSchemaDone
}
}
func (dm *diskMetaStore) pushShardOwnershipChange(tableName string) {
if dm.shardOwnershipWatcher != nil {
dm.shardOwnershipWatcher <- common.ShardOwnership{
TableName: tableName,
Shard: 0,
ShouldOwn: true}
<-dm.shardOwnershipDone
}
}
// listTable lists the table
func (dm *diskMetaStore) listTables() ([]string, error) {
tableDirs, err := dm.ReadDir(dm.basePath)
if err != nil {
return nil, utils.StackError(err, "Failed to list tables")
}
tableNames := make([]string, len(tableDirs))
for id, tableDir := range tableDirs {
tableNames[id] = tableDir.Name()
}
return tableNames, nil
}
func (dm *diskMetaStore) removeTable(tableName string) error {
if err := dm.RemoveAll(dm.getTableDirPath(tableName)); err != nil {
return utils.StackError(err, "Failed to remove directory, table: %s", tableName)
}
// close all related enum dict watchers
// make sure all producer have done producing and detach
columnWatchers := dm.enumDictWatchers[tableName]
doneWatchers := dm.enumDictDone[tableName]
delete(dm.enumDictWatchers, tableName)
delete(dm.enumDictDone, tableName)
for columnName, watcher := range columnWatchers {
close(watcher)
// drain done channels for related enum watchers
// to make sure all previous changes are done
for range doneWatchers[columnName] {
}
}
return nil
}
func (dm *diskMetaStore) addColumn(table *common.Table, column common.Column, appendToArchivingSortOrder bool) error {
validator := NewTableSchameValidator()
validator.SetOldTable(*table)
newColumnID := len(table.Columns)
table.Columns = append(table.Columns, column)
if appendToArchivingSortOrder {
table.ArchivingSortColumns = append(table.ArchivingSortColumns, newColumnID)
}
validator.SetNewTable(*table)
err := validator.Validate()
if err != nil {
return err
}
if err := dm.writeSchemaFile(table); err != nil {
return utils.StackError(err, "Failed to write schema file, table: %s", table.Name)
}
// if enum column, append a enum case for default value
if column.DefaultValue != nil && column.IsEnumBasedColumn() {
return dm.writeEnumFile(table.Name, column.Name, []string{*column.DefaultValue})
}
return nil
}
func (dm *diskMetaStore) updateColumn(table *common.Table, columnName string, config common.ColumnConfig) (err error) {
for id, column := range table.Columns {
if column.Name == columnName {
if column.Deleted {
// continue looking since there could be reused column name
// with different column id.
continue
}
column.Config = config
table.Columns[id] = column
return dm.writeSchemaFile(table)
}
}
return common.ErrColumnDoesNotExist
}
func (dm *diskMetaStore) removeColumn(table *common.Table, columnName string) error {
for id, column := range table.Columns {
if column.Name == columnName {
if column.Deleted {
// continue looking since there could be reused column name
// with different column id
continue
}
// trying to delete timestamp column from fact table
if table.IsFactTable && id == 0 {
return common.ErrDeleteTimeColumn
}
if utils.IndexOfInt(table.PrimaryKeyColumns, id) >= 0 {
return common.ErrDeletePrimaryKeyColumn
}
column.Deleted = true
table.Columns[id] = column
if err := dm.writeSchemaFile(table); err != nil {
return err
}
if column.IsEnumBasedColumn() {
dm.removeEnumColumn(table.Name, column.Name)
}
return nil
}
}
return common.ErrColumnDoesNotExist
}
func (dm *diskMetaStore) getTableDirPath(tableName string) string {
return filepath.Join(dm.basePath, tableName)
}
func (dm *diskMetaStore) getEnumDirPath(tableName string) string {
return filepath.Join(dm.getTableDirPath(tableName), "enums")
}
func (dm *diskMetaStore) getEnumFilePath(tableName, columnName string) string {
return filepath.Join(dm.getEnumDirPath(tableName), columnName)
}
func (dm *diskMetaStore) getSchemaFilePath(tableName string) string {
return filepath.Join(dm.getTableDirPath(tableName), "schema")
}
func (dm *diskMetaStore) getShardsDirPath(tableName string) string {
return filepath.Join(dm.getTableDirPath(tableName), "shards")
}
func (dm *diskMetaStore) getShardDirPath(tableName string, shard int) string {
return filepath.Join(dm.getShardsDirPath(tableName), strconv.Itoa(shard))
}
func (dm *diskMetaStore) getShardVersionFilePath(tableName string, shard int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "version")
}
func (dm *diskMetaStore) getArchiveBatchVersionFilePath(tableName string, shard, batchID int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "batches", strconv.Itoa(batchID))
}
func (dm *diskMetaStore) getArchiveBatchDirPath(tableName string, shard int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "batches")
}
func (dm *diskMetaStore) getRedoLogVersionAndOffsetFilePath(tableName string, shard int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "redolog-offset")
}
func (dm *diskMetaStore) getSnapshotRedoLogVersionAndOffsetFilePath(tableName string, shard int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "snapshot")
}
// Get file path which stores the ingestion commit offset, mainly used for kafka or other streaming based ingestion
func (dm *diskMetaStore) getIngestionCommitOffsetFilePath(tableName string, shard int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "commit-offset")
}
// Get file path which stores the ingestion checkpoint offset, mainly used fo kafka or other streaming based ingestion
func (dm *diskMetaStore) getIngestionCheckpointOffsetFilePath(tableName string, shard int) string {
return filepath.Join(dm.getShardDirPath(tableName, shard), "checkpoint-offset")
}
// readEnumFile reads the enum cases from file.
func (dm *diskMetaStore) readEnumFile(tableName, columnName string) ([]string, error) {
enumBytes, err := dm.ReadFile(dm.getEnumFilePath(tableName, columnName))
if err != nil {
if os.IsNotExist(err) {
return []string{}, nil
}
return nil,
utils.StackError(err,
"Failed to read enum file, table: %s, column: %s",
tableName,
columnName,
)
}
return strings.Split(strings.TrimSuffix(string(enumBytes), common.EnumDelimiter), common.EnumDelimiter), nil
}
// writeEnumFile append enum cases to existing file
func (dm *diskMetaStore) writeEnumFile(tableName, columnName string, enumCases []string) error {
if len(enumCases) == 0 {
return nil
}
err := dm.MkdirAll(dm.getEnumDirPath(tableName), 0755)
if err != nil {
return utils.StackError(err, "Failed to create enums directory")
}
writer, err := dm.OpenFileForWrite(
dm.getEnumFilePath(tableName, columnName),
os.O_WRONLY|os.O_APPEND|os.O_CREATE,
0644,
)
if err != nil {
return utils.StackError(err, "Failed to open enum file, table: %s, column: %s", tableName, columnName)
}
defer writer.Close()
_, err = io.WriteString(writer, fmt.Sprintf("%s%s", strings.Join(enumCases, common.EnumDelimiter), common.EnumDelimiter))
if err != nil {
return utils.StackError(err, "Failed to write enum cases, table: %s, column: %s", tableName, columnName)
}
return nil
}
// readSchemaFile reads the schema file for given table.
func (dm *diskMetaStore) readSchemaFile(tableName string) (*common.Table, error) {
jsonBytes, err := dm.ReadFile(dm.getSchemaFilePath(tableName))
if err != nil {
return nil, utils.StackError(
err,
"Failed to read schema file, table: %s",
tableName,
)
}
var table common.Table
table.Config = DefaultTableConfig
err = json.Unmarshal(jsonBytes, &table)
if err != nil {
return nil, utils.StackError(
err,
"Failed to unmarshal table schema, table: %s",
tableName,
)
}
return &table, nil
}
// writeSchemaFile reads the schema file for given table.
func (dm *diskMetaStore) writeSchemaFile(table *common.Table) error {
tableSchemaBytes, err := json.MarshalIndent(table, "", " ")
if err != nil {
return utils.StackError(err, "Failed to marshal schema")
}
writer, err := dm.OpenFileForWrite(
dm.getSchemaFilePath(table.Name),
os.O_WRONLY|os.O_TRUNC|os.O_CREATE,
0644,
)
if err != nil {
return utils.StackError(
err,
"Failed to open schema file for write, table: %s",
table.Name,
)
}
defer writer.Close()
_, err = writer.Write(tableSchemaBytes)
return err
}
// readVersion reads the version from a given version file.
func (dm *diskMetaStore) readVersion(file string) (uint32, error) {
fileBytes, err := dm.ReadFile(file)
if os.IsNotExist(err) {
return 0, nil
}
if err != nil {
return 0, utils.StackError(err, "Failed to open version file %s", file)
}
var version uint32
_, err = fmt.Fscanln(bytes.NewBuffer(fileBytes), &version)
if err != nil {
return 0, utils.StackError(err, "Failed to read version file %s", file)
}
return version, nil
}
// readRedoLogFileAndOffset reads the redo log file and offset from the file.
func (dm *diskMetaStore) readRedoLogFileAndOffset(filePath string) (int64, uint32, error) {
bytes, err := dm.ReadFile(filePath)
if os.IsNotExist(err) {
return 0, 0, nil
} else if err != nil {
return 0, 0, utils.StackError(err, "Failed to read file:%s\n", filePath)
}
redoLogAndOffset := strings.Split(strings.TrimSuffix(string(bytes), "\n"), ",")
if len(redoLogAndOffset) < 2 {
return 0, 0, utils.StackError(nil, "Invalid redo log and offset file:%s:not enough strings\n", filePath)
}
var redoLogVersion int64
redoLogVersion, err = strconv.ParseInt(redoLogAndOffset[0], 10, 64)
if err != nil {
return 0, 0, utils.StackError(err, "Invalid redo log and offset file:%s:invalid redo log file\n", filePath)
}
offset, err := strconv.ParseUint(redoLogAndOffset[1], 10, 32)
if err != nil {
return 0, 0, utils.StackError(err, "Invalid redo log and offset file:%s:invalid offset\n", filePath)
}
return redoLogVersion, uint32(offset), nil
}
// readSnapshotRedoLogFileAndOffset reads the redo log file and offset from the file.
func (dm *diskMetaStore) readSnapshotRedoLogFileAndOffset(filePath string) (int64, uint32, int32, uint32, error) {
bytes, err := dm.ReadFile(filePath)
if os.IsNotExist(err) {
return 0, 0, 0, 0, nil
} else if err != nil {
return 0, 0, 0, 0, utils.StackError(err, "Failed to read file:%s\n", filePath)
}
snapshotInfo := strings.Split(strings.TrimSuffix(string(bytes), "\n"), ",")
if len(snapshotInfo) < 4 {
return 0, 0, 0, 0, utils.StackError(nil, "Invalid snapshot redolog file:%s:not enough strings\n", filePath)
}
var redoLogVersion int64
redoLogVersion, err = strconv.ParseInt(snapshotInfo[0], 10, 64)
if err != nil {
return 0, 0, 0, 0, utils.StackError(err, "Invalid sanshot redolog file:%s:invalid redo log file\n", filePath)
}
offset, err := strconv.ParseUint(snapshotInfo[1], 10, 32)
if err != nil {
return 0, 0, 0, 0, utils.StackError(err, "Invalid snapshot redolog file:%s:invalid offset\n", filePath)
}
batchID, err := strconv.ParseInt(snapshotInfo[2], 10, 64)
if err != nil {
return 0, 0, 0, 0, utils.StackError(err, "Invalid snapshot redolog file:%s:invalid batch id\n", filePath)
}
index, err := strconv.ParseUint(snapshotInfo[3], 10, 32)
if err != nil {
return 0, 0, 0, 0, utils.StackError(err, "Invalid snapshot redolog file:%s:invalid index\n", filePath)
}
return redoLogVersion, uint32(offset), int32(batchID), uint32(index), nil
}
// writeArchivingCutoff writes the version to a given file.
func (dm *diskMetaStore) writeArchivingCutoff(file string, version uint32) error {
if err := dm.MkdirAll(filepath.Dir(file), 0755); err != nil {
return utils.StackError(err, "Failed to create version directory")
}
writer, err := dm.OpenFileForWrite(
file,
os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
0644,
)
if err != nil {
return utils.StackError(err, "Failed to open version file %s for write", file)
}
defer writer.Close()
_, err = io.WriteString(writer, fmt.Sprintf("%d", version))
return err
}
// writeRedoLogVersionAndOffset writes redolog&offset to a given file.
func (dm *diskMetaStore) writeRedoLogVersionAndOffset(file string, redoLogFile int64, upsertBatchOffset uint32) error {
if err := dm.MkdirAll(filepath.Dir(file), 0755); err != nil {
return utils.StackError(err, "Failed to create redo log version and upsert batch offset directory")
}
writer, err := dm.OpenFileForWrite(
file,
os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
0644,
)
if err != nil {
return utils.StackError(err, "Failed to open redo log version and upsert batch offset file %s for write", file)
}
defer writer.Close()
_, err = io.WriteString(writer, fmt.Sprintf("%d,%d", redoLogFile, upsertBatchOffset))
return err
}
// writeSnapshotRedoLogVersionAndOffset writes redolog&offset and last record position to a given file.
func (dm *diskMetaStore) writeSnapshotRedoLogVersionAndOffset(file string, redoLogFile int64, upsertBatchOffset uint32, lastReadBatchID int32, lastReadBatchOffset uint32) error {
if err := dm.MkdirAll(filepath.Dir(file), 0755); err != nil {
return utils.StackError(err, "Failed to create snapshot redo log version and upsert batch offset directory")
}
writer, err := dm.OpenFileForWrite(
file,
os.O_CREATE|os.O_TRUNC|os.O_WRONLY,
0644,
)
if err != nil {
return utils.StackError(err, "Failed to open snapshot redo log version and upsert batch offset file %s for write", file)
}
defer writer.Close()
_, err = io.WriteString(writer, fmt.Sprintf("%d,%d,%d,%d", redoLogFile, upsertBatchOffset, lastReadBatchID, lastReadBatchOffset))
return err
}
// closeEnumWatcher try to close enum watcher and delete enum file
func (dm *diskMetaStore) removeEnumColumn(tableName, columnName string) {
if _, tableExist := dm.enumDictWatchers[tableName]; tableExist {
watcher, watcherExist := dm.enumDictWatchers[tableName][columnName]
if watcherExist {
doneChan, _ := dm.enumDictDone[tableName][columnName]
delete(dm.enumDictWatchers[tableName], columnName)
delete(dm.enumDictDone[tableName], columnName)
close(watcher)
// drain up done channel for enum column
// to make sure previous changes are processed
for range doneChan {
}
}
}
if err := dm.Remove(dm.getEnumFilePath(tableName, columnName)); err != nil {
//TODO: log an error and alert.
}
}
// tableExists checks whether table exists,
// return ErrTableDoesNotExist.
func (dm *diskMetaStore) tableExists(tableName string) error {
_, err := dm.Stat(dm.getSchemaFilePath(tableName))
if os.IsNotExist(err) {
return common.ErrTableDoesNotExist
} else if err != nil {
return utils.StackError(err, "Failed to read directory, table: %s", tableName)
}
return nil
}
func (dm *diskMetaStore) getColumnByName(tableName, columnName string) (*common.Column, error) {
if err := dm.tableExists(tableName); err != nil {
return nil, err
}
table, err := dm.readSchemaFile(tableName)
if err != nil {
return nil, err
}
for _, column := range table.Columns {
if column.Name == columnName {
if column.Deleted {
// continue since column name can be reused
// with different id
continue
}
if !column.IsEnumBasedColumn() {
return nil, common.ErrNotEnumColumn
}
return &column, nil
}
}
return nil, common.ErrColumnDoesNotExist
}
// enumColumnExists checks whether column exists and it is a enum column,
// return ErrTableDoesNotExist, ErrColumnDoesNotExist, ErrNotEnumColumn.
func (dm *diskMetaStore) enumColumnExists(tableName string, columnName string) error {
column, err := dm.getColumnByName(tableName, columnName)
if err != nil {
return err
}
if !column.IsEnumBasedColumn() {
return common.ErrNotEnumColumn
}
return nil
}
// NewDiskMetaStore creates a new disk based metastore
func NewDiskMetaStore(basePath string) (common.MetaStore, error) {
metaStore := &diskMetaStore{
FileSystem: utils.OSFileSystem{},
basePath: basePath,
writeLock: sync.Mutex{},
enumDictWatchers: make(map[string]map[string]chan<- string),
enumDictDone: make(map[string]map[string]<-chan struct{}),
}
err := metaStore.MkdirAll(basePath, 0755)
if err != nil {
return nil, utils.StackError(err, "Failed to make base directory for metastore, path: %s", basePath)
}
return metaStore, nil
}