diskstore/local_diskstore.go (338 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package diskstore
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/uber/aresdb/common"
"github.com/uber/aresdb/utils"
)
// LocalDiskStore is the implementation of Diskstore for local disk.
type LocalDiskStore struct {
rootPath string
diskStoreConfig common.DiskStoreConfig
}
// NewLocalDiskStore is used to init a LocalDiskStore with rootPath.
func NewLocalDiskStore(rootPath string) DiskStore {
return LocalDiskStore{
rootPath: rootPath,
diskStoreConfig: utils.GetConfig().DiskStore,
}
}
const timeFormatForBatchID = "2006-01-02"
// Table shard level operation
// DeleteTableShard : Completely wipe out a table shard.
func (l LocalDiskStore) DeleteTableShard(table string, shard int) error {
tableShardDir := getPathForTableShard(l.rootPath, table, shard)
return os.RemoveAll(tableShardDir)
}
// Redo Logs
// ListLogFiles : Returns the file creation unix time in second for each log file as a sorted slice.
func (l LocalDiskStore) ListLogFiles(table string, shard int) (creationUnixTime []int64, err error) {
tableRedologDir := GetPathForTableRedologs(l.rootPath, table, shard)
redologsFiles, err := ioutil.ReadDir(tableRedologDir)
// The redo log directory won't get created until the first append call.
if os.IsNotExist(err) {
err = nil
return
}
if err != nil {
err = utils.StackError(err, "Failed to list redolog file for redolog dir: %s", tableRedologDir)
return
}
for _, f := range redologsFiles {
matchedRedologFilePattern, _ := regexp.MatchString("([0-9]+).redolog", f.Name())
if matchedRedologFilePattern {
creationTime, err := strconv.ParseInt(strings.Split(f.Name(), ".")[0], 10, 64)
if err != nil {
// Failed to parse redolog file, will continue
utils.GetLogger().Debugf("Failed to parse redolog file: %s, will continue", f.Name())
continue
}
creationUnixTime = append(creationUnixTime, creationTime)
}
}
sort.Sort(utils.Int64Array(creationUnixTime))
return creationUnixTime, nil
}
// OpenLogFileForReplay : Opens the specified log file for replay.
func (l LocalDiskStore) OpenLogFileForReplay(table string, shard int,
creationTime int64) (utils.ReaderSeekerCloser, error) {
logFilePath := GetPathForRedologFile(l.rootPath, table, shard, creationTime)
f, err := os.OpenFile(logFilePath, os.O_RDONLY, 0644)
if err != nil {
return nil, utils.StackError(err, "Failed to open redolog file: %s for replay", logFilePath)
}
return f, nil
}
// OpenLogFileForAppend : Opens/creates the specified log file for append.
func (l LocalDiskStore) OpenLogFileForAppend(table string, shard int, creationTime int64) (utils.WriteSyncCloser, error) {
tableRedologDir := GetPathForTableRedologs(l.rootPath, table, shard)
if err := os.MkdirAll(tableRedologDir, 0755); err != nil {
return nil, utils.StackError(err, "Failed to make dirs for path: %s", tableRedologDir)
}
logFilePath := GetPathForRedologFile(l.rootPath, table, shard, creationTime)
mode := os.O_APPEND | os.O_CREATE | os.O_WRONLY
f, err := os.OpenFile(logFilePath, mode, 0644)
if err != nil {
return nil, utils.StackError(err, "Failed to open redolog file: %s for append", logFilePath)
}
return f, nil
}
// DeleteLogFile is used to delete a specified redolog.
func (l LocalDiskStore) DeleteLogFile(table string, shard int, creationTime int64) error {
redologFilePath := GetPathForRedologFile(l.rootPath, table, shard, creationTime)
err := os.Remove(redologFilePath)
if err != nil {
return utils.StackError(err, "Failed to delete redolog file: %s", redologFilePath)
}
utils.GetLogger().With("action", "deletelogfile", "table", table, "shard", shard).Infof("Delete redolog file: %s", redologFilePath)
return nil
}
// TruncateLogFile is used to truncate redolog to drop the last incomplete/corrupted upsert batch.
func (l LocalDiskStore) TruncateLogFile(table string, shard int, creationTime int64, offset int64) error {
redologFilePath := GetPathForRedologFile(l.rootPath, table, shard, creationTime)
err := os.Truncate(redologFilePath, offset)
return err
}
// Snapshot files.
// ListSnapshotBatches : Returns the batch directories at the specified version.
func (l LocalDiskStore) ListSnapshotBatches(table string, shard int,
redoLogFile int64, offset uint32) (batches []int, err error) {
snapshotPath := GetPathForTableSnapshotDirPath(l.rootPath, table, shard, redoLogFile, offset)
batchDirs, err := ioutil.ReadDir(snapshotPath)
// No batches for this snapshot
if os.IsNotExist(err) {
err = nil
return
}
if err != nil {
err = utils.StackError(err, "Failed to list batch dirs for snapshot dir: %s", snapshotPath)
return
}
for _, f := range batchDirs {
batch, err := strconv.ParseInt(f.Name(), 10, 32)
if err != nil {
utils.GetLogger().With("err", err, "batch_dir_name", f.Name()).
Debug("Find invalid snapshot batch dir")
err = nil
} else {
batches = append(batches, int(batch))
}
}
sort.Ints(batches)
return batches, nil
}
// ListSnapshotVectorPartyFiles : Returns the vector party files under specific batch directory.
func (l LocalDiskStore) ListSnapshotVectorPartyFiles(table string, shard int,
redoLogFile int64, offset uint32, batchID int) (columnIDs []int, err error) {
snapshotBatchDir := GetPathForTableSnapshotBatchDir(l.rootPath, table, shard,
redoLogFile, offset, batchID)
return l.readVectoryPartyFiles(snapshotBatchDir)
}
func (l LocalDiskStore) readVectoryPartyFiles(dir string) (columnIDs []int, err error) {
vpFiles, err := ioutil.ReadDir(dir)
if os.IsNotExist(err) {
err = nil
return
}
if err != nil {
err = utils.StackError(err, "Failed to list vp file for batch dir: %s", dir)
return
}
for _, f := range vpFiles {
matchedVectorPartyFilePattern, _ := regexp.MatchString("([0-9]+).data", f.Name())
if matchedVectorPartyFilePattern {
var columnID int64
columnID, err = strconv.ParseInt(strings.Split(f.Name(), ".")[0], 10, 32)
if err != nil {
err = utils.StackError(err, "Failed to parse file name: %s as "+
"valid vector party file name",
f.Name())
return
}
columnIDs = append(columnIDs, int(columnID))
} else {
err = utils.StackError(nil, "Failed to parse file name: %s as "+
"valid vector party file name",
f.Name())
return
}
}
sort.Ints(columnIDs)
return
}
// OpenSnapshotVectorPartyFileForRead : Opens the snapshot file for read at the specified version.
func (l LocalDiskStore) OpenSnapshotVectorPartyFileForRead(table string, shard int,
redoLogFile int64, offset uint32, batchID int, columnID int) (io.ReadCloser, error) {
snapshotFilePath := GetPathForTableSnapshotColumnFilePath(l.rootPath, table, shard, redoLogFile, offset, batchID, columnID)
f, err := os.OpenFile(snapshotFilePath, os.O_RDONLY, 0644)
if os.IsNotExist(err) {
return nil, os.ErrNotExist
} else if err != nil {
return nil, utils.StackError(err, "Failed to open snapshot file: %s for read", snapshotFilePath)
}
return f, nil
}
// OpenSnapshotVectorPartyFileForWrite : Creates/truncates the snapshot file for write at the specified version.
func (l LocalDiskStore) OpenSnapshotVectorPartyFileForWrite(table string, shard int,
redoLogFile int64, offset uint32, batchID int, columnID int) (utils.WriteSyncCloser, error) {
snapshotFilePath := GetPathForTableSnapshotColumnFilePath(l.rootPath, table, shard, redoLogFile, offset, batchID, columnID)
dir := filepath.Dir(snapshotFilePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, utils.StackError(err, "Failed to make dirs for path: %s", dir)
}
mode := os.O_CREATE | os.O_WRONLY
f, err := os.OpenFile(snapshotFilePath, mode, 0644)
os.Truncate(snapshotFilePath, 0)
if err != nil {
return nil, utils.StackError(err, "Failed to open snapshot file: %s for write", snapshotFilePath)
}
return f, nil
}
// DeleteSnapshot : Deletes snapshot directories **older than** the specified version (redolog file and offset).
func (l LocalDiskStore) DeleteSnapshot(table string, shard int, latestRedoLogFile int64, latestOffset uint32) error {
tableSnapshotDir := GetPathForTableSnapshotDir(l.rootPath, table, shard)
tableSnapshotFiles, err := ioutil.ReadDir(tableSnapshotDir)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return utils.StackError(err, "Failed to list snapshot files for snapshot dir: %s", tableSnapshotDir)
}
for _, f := range tableSnapshotFiles {
matchedSnapshotDirPattern, _ := regexp.MatchString("([0-9]+)_([0-9]+)", f.Name())
if matchedSnapshotDirPattern {
comps := strings.Split(f.Name(), "_")
redoLogFile, err := strconv.ParseInt(comps[0], 10, 64)
if err != nil {
err = nil
// Failed to parse snapshot file name, will skip.
utils.GetLogger().Debugf("Failed to parse latestRedoLogFile from snapshot file: %s, will continue", f.Name())
continue
}
offset, err := strconv.ParseUint(comps[1], 10, 32)
if err != nil {
err = nil
// Failed to parse snapshot file name, will skip.
utils.GetLogger().Debugf("Failed to parse offset from snapshot file: %s, will continue", f.Name())
continue
}
if redoLogFile < latestRedoLogFile || (redoLogFile == latestRedoLogFile && uint32(offset) < latestOffset) {
snapshotToDeleteFilePath := GetPathForTableSnapshotDirPath(l.rootPath, table, shard, redoLogFile, uint32(offset))
utils.GetLogger().With(
"action", "delete_snapshot",
"redoLog", latestRedoLogFile,
"offset", latestOffset).Infof("delete snapshot: %s", snapshotToDeleteFilePath)
err := os.RemoveAll(snapshotToDeleteFilePath)
if err != nil {
return utils.StackError(err, "Failed to delete snapshot file: %s", f.Name())
}
}
}
}
return nil
}
// Archived vector party files.
// ListArchiveBatchVectorPartyFiles return all vp for one batch version/seq
func (l LocalDiskStore) ListArchiveBatchVectorPartyFiles(table string, shard, batchID int,
batchVersion uint32, seqNum uint32) ([]int, error) {
batchIDTimeStr := daysSinceEpochToTimeStr(batchID)
tableArchiveBatchDir := GetPathForTableArchiveBatchDir(l.rootPath, table, shard, batchIDTimeStr, batchVersion, seqNum)
return l.readVectoryPartyFiles(tableArchiveBatchDir)
}
// OpenVectorPartyFileForRead : Opens the vector party file at the specified batchVersion for read.
func (l LocalDiskStore) OpenVectorPartyFileForRead(table string, columnID int, shard, batchID int, batchVersion uint32,
seqNum uint32) (io.ReadCloser, error) {
batchIDTimeStr := daysSinceEpochToTimeStr(batchID)
vectorPartyFilePath := GetPathForTableArchiveBatchColumnFile(l.rootPath, table, shard, batchIDTimeStr, batchVersion,
seqNum, columnID)
f, err := os.OpenFile(vectorPartyFilePath, os.O_RDONLY, 0644)
if os.IsNotExist(err) {
return nil, os.ErrNotExist
} else if err != nil {
return nil, utils.StackError(err, "Failed to open vector party file: %s for read", vectorPartyFilePath)
}
return f, nil
}
// OpenVectorPartyFileForWrite : Creates/truncates the vector party file at the specified batchVersion for write.
func (l LocalDiskStore) OpenVectorPartyFileForWrite(table string, columnID int, shard, batchID int, batchVersion uint32,
seqNum uint32) (utils.WriteSyncCloser, error) {
batchIDTimeStr := daysSinceEpochToTimeStr(batchID)
batchDir := GetPathForTableArchiveBatchDir(l.rootPath, table, shard, batchIDTimeStr, batchVersion, seqNum)
if err := os.MkdirAll(batchDir, 0755); err != nil {
return nil, utils.StackError(err, "Failed to make dirs for path: %s", batchDir)
}
vectorPartyFilePath := GetPathForTableArchiveBatchColumnFile(l.rootPath, table, shard, batchIDTimeStr, batchVersion,
seqNum, columnID)
mode := os.O_CREATE | os.O_WRONLY
f, err := os.OpenFile(vectorPartyFilePath, mode, 0644)
if err != nil {
return nil, utils.StackError(err, "Failed to open vector party file: %s for write", vectorPartyFilePath)
}
return f, nil
}
// DeleteBatchVersions deletes all old batches with the specified batchID that have version lower than or equal to
// the specified batch version. All columns of those batches will be deleted.
func (l LocalDiskStore) DeleteBatchVersions(table string, shard, batchID int, batchVersion uint32, seqNum uint32) error {
batchIDTimeStr := daysSinceEpochToTimeStr(batchID)
archiveBatchRootDir := GetPathForTableArchiveBatchRootDir(l.rootPath, table, shard)
oldBatchDirPaths, _ := filepath.Glob(filepath.Join(archiveBatchRootDir, batchIDTimeStr) + "_*")
for _, oldBatchDirPath := range oldBatchDirPaths {
oldBatchInfoStr := filepath.Base(oldBatchDirPath)
_, oldBatchVersion, oldSeqNum, _ := ParseBatchIDAndVersionName(oldBatchInfoStr)
if oldBatchVersion < batchVersion || (oldBatchVersion == batchVersion && oldSeqNum <= seqNum) {
err := os.RemoveAll(oldBatchDirPath)
if err != nil {
return utils.StackError(err, "Failed to delete batch directory: %s", oldBatchDirPath)
}
}
}
return nil
}
// DeleteBatches : Deletes all batches within [batchIDStart, batchIDEnd)
func (l LocalDiskStore) DeleteBatches(table string, shard, batchIDStart, batchIDEnd int) (int, error) {
batchIDStartTime := daysSinceEpochToTime(batchIDStart)
batchIDEndTime := daysSinceEpochToTime(batchIDEnd)
tableArchiveBatchRootDir := GetPathForTableArchiveBatchRootDir(l.rootPath, table, shard)
tableArchiveBatchDirs, err := ioutil.ReadDir(tableArchiveBatchRootDir)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, utils.StackError(err, "Failed to list archive batches from table archive batch root dir: %s",
tableArchiveBatchRootDir)
}
numBatches := 0
for _, f := range tableArchiveBatchDirs {
batchID, batchVersion, seqNum, _ := ParseBatchIDAndVersionName(f.Name())
batchIDTime, err := time.Parse(timeFormatForBatchID, batchID)
if err != nil {
utils.GetLogger().Debugf("Failed to parse batchID: %s to yyyy-MM-dd format time", batchID)
continue
}
batchIDTime = batchIDTime.UTC()
if !batchIDTime.Before(batchIDStartTime) && batchIDTime.Before(batchIDEndTime) {
archiveBatchDir := GetPathForTableArchiveBatchDir(l.rootPath, table, shard, batchID, batchVersion, seqNum)
err := os.RemoveAll(archiveBatchDir)
if err != nil {
utils.GetLogger().Debugf("Failed to delete archive batch dir: %s", archiveBatchDir)
} else {
numBatches++
}
}
}
return numBatches, nil
}
// DeleteColumn : Deletes all batches of the specified column.
func (l LocalDiskStore) DeleteColumn(table string, columnID int, shard int) error {
tableArchiveBatchRootDir := GetPathForTableArchiveBatchRootDir(l.rootPath, table, shard)
tableArchiveBatchDirs, err := ioutil.ReadDir(tableArchiveBatchRootDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return utils.StackError(err, "Failed to list archive batches from table archive batch root dir: %s",
tableArchiveBatchRootDir)
}
for _, f := range tableArchiveBatchDirs {
if f.IsDir() {
if batchID, batchVersion, seqNum, err := ParseBatchIDAndVersionName(f.Name()); err == nil {
vectorPartyFilePath := GetPathForTableArchiveBatchColumnFile(l.rootPath, table, shard, batchID,
batchVersion, seqNum, columnID)
if err = os.Remove(vectorPartyFilePath); err != nil && !os.IsNotExist(err) {
utils.GetLogger().With(
"vectorPartyFilePath", vectorPartyFilePath,
"err", err,
).Warn("Failed to delete a vector party file")
continue
}
}
}
}
return nil
}
func daysSinceEpochToTime(daysSinceEpoch int) time.Time {
secondsSinceEpoch := int64(daysSinceEpoch) * 86400
timeObj := time.Unix(secondsSinceEpoch, 0).UTC()
return timeObj
}
func daysSinceEpochToTimeStr(daysSinceEpoch int) string {
return daysSinceEpochToTime(daysSinceEpoch).Format(timeFormatForBatchID)
}