redolog/file_redolog_manager.go (291 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redolog
import (
"encoding/json"
"github.com/uber/aresdb/diskstore"
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/utils"
"io"
"sync"
)
// UpsertHeader is the magic header written into the beginning of each redo log file.
const UpsertHeader uint32 = 0xADDAFEED
// fileRedologManager manages the redo log file append, rotation, purge. It is used by ingestion,
// recovery and archiving. Accessor must hold the TableShard.WriterLock to access it.
type FileRedoLogManager struct {
// The lock is to protect MaxEventTimePerFile.
sync.RWMutex `json:"-"`
// The time interval of redo file rotations.
RotationInterval int64 `json:"rotationInterval"`
// The limit of redo file size to trigger rotations.
MaxRedoLogSize int64 `json:"maxRedoLogSize"`
// Current redo log size
CurrentRedoLogSize uint32 `json:"currentRedoLogSize"`
// size of all redologs
TotalRedoLogSize uint `json:"totalRedologSize"`
// The map with redo log creation time as the key and max event time as the value. Readers
// need to hold the reader lock in accessing the field.
MaxEventTimePerFile map[int64]uint32 `json:"maxEventTimePerFile"`
// redo log creation time -> batch count mapping.
// Readers need to hold the reader lock in accessing the field.
BatchCountPerFile map[int64]uint32 `json:"batchCountPerFile"`
// SizePerFile
SizePerFile map[int64]uint32 `json:"sizePerFile"`
// Current log file points to the current redo log file used for appending new upsert batches.
currentLogFile utils.WriteSyncCloser
// Current file creation time in milliseconds.
CurrentFileCreationTime int64 `json:"currentFileCreationTime"`
// Pointer to the disk store for redo log access.
diskStore diskstore.DiskStore
// Name of the table.
tableName string
// The shard id of the table.
shard int
// used for external blocking check if recovery done
recoveryChan chan bool
recoveryDone bool
// batch recovered counts
batchRecovered int
}
// newFileRedoLogManager creates a new fileRedologManager instance.
func newFileRedoLogManager(rotationInterval int64, maxRedoLogSize int64, diskStore diskstore.DiskStore, tableName string, shard int) *FileRedoLogManager {
return &FileRedoLogManager{
RotationInterval: rotationInterval,
MaxEventTimePerFile: make(map[int64]uint32),
BatchCountPerFile: make(map[int64]uint32),
SizePerFile: make(map[int64]uint32),
diskStore: diskStore,
tableName: tableName,
shard: shard,
MaxRedoLogSize: maxRedoLogSize,
CurrentRedoLogSize: 0,
recoveryChan: make(chan bool, 1),
}
}
// openFileForWrite handles redo log file opening and rotation (if needed). It guarantees the
// validity of the currentLogFile upon return.
func (r *FileRedoLogManager) openFileForWrite(upsertBatchSize uint32) {
dataTime := utils.Now().Unix()
// If current file is still valid we just return the writer back.
if r.currentLogFile != nil && dataTime < r.CurrentFileCreationTime+r.RotationInterval &&
int64(r.CurrentRedoLogSize+upsertBatchSize+4) < r.MaxRedoLogSize {
return
}
var err error
if r.currentLogFile != nil {
if err = r.currentLogFile.Close(); err != nil {
utils.GetLogger().Panic("Failed to close current redo log file")
}
}
if r.currentLogFile, err = r.diskStore.OpenLogFileForAppend(r.tableName, r.shard, dataTime); err != nil {
utils.GetLogger().With(
"table", r.tableName,
"shard", r.shard,
"error", err.Error()).Panic("Failed to open new redo log file")
}
writer := utils.NewStreamDataWriter(r.currentLogFile)
if err = writer.WriteUint32(UpsertHeader); err != nil {
utils.GetLogger().Panic("Failed to write magic header to the new redo log")
}
// sync file after writing upsert header
if err = r.currentLogFile.Sync(); err != nil {
utils.GetLogger().Panic("Failed to sync redolog")
}
r.CurrentFileCreationTime = dataTime
utils.GetReporter(r.tableName, r.shard).GetGauge(utils.CurrentRedologCreationTime).Update(float64(r.CurrentFileCreationTime))
r.MaxEventTimePerFile[r.CurrentFileCreationTime] = 0
r.SizePerFile[r.CurrentFileCreationTime] = 0
utils.GetReporter(r.tableName, r.shard).GetGauge(utils.NumberOfRedologs).Update(float64(len(r.SizePerFile)))
r.CurrentRedoLogSize = 4
}
// IsAppendEnabled returns whether appending is enabled
func (r *FileRedoLogManager) IsAppendEnabled() bool {
return true
}
// AppendToRedoLog saves an upsert batch into disk before applying it. Any errors from diskStore
// will trigger system panic.
func (r *FileRedoLogManager) AppendToRedoLog(upsertBatch *common.UpsertBatch) (int64, uint32) {
r.openFileForWrite(uint32(len(upsertBatch.GetBuffer())))
buffer := upsertBatch.GetBuffer()
writer := utils.NewStreamDataWriter(r.currentLogFile)
// Write buffer size.
if err := writer.WriteUint32(uint32(len(buffer))); err != nil {
utils.GetLogger().With("error", err).Panic("Failed to write buffer size into the redo log")
}
if _, err := r.currentLogFile.Write(buffer); err != nil {
utils.GetLogger().With("error", err).Panic("Failed to write upsert buffer into the redo log")
}
// sync after upsert batch appended to redolog
if err := r.currentLogFile.Sync(); err != nil {
utils.GetLogger().With("error", err).Panic("Failed to sync write to redo log")
}
// update current redo log size
r.CurrentRedoLogSize += uint32(len(upsertBatch.GetBuffer())) + 4
r.SizePerFile[r.CurrentFileCreationTime] += uint32(len(upsertBatch.GetBuffer())) + 4
r.TotalRedoLogSize += uint(len(upsertBatch.GetBuffer())) + 4
utils.GetReporter(r.tableName, r.shard).GetGauge(utils.CurrentRedologSize).Update(float64(r.CurrentRedoLogSize))
utils.GetReporter(r.tableName, r.shard).GetGauge(utils.SizeOfRedologs).Update(float64(r.TotalRedoLogSize))
// Update offset of the last batch for the current redolog
return r.CurrentFileCreationTime, r.updateBatchCount(r.CurrentFileCreationTime) - 1
}
// UpdateMaxEventTime updates the max event time of the current redo log file.
// redoFile is the key to the corresponding redo file that needs to have the maxEventTime updated.
// redoFile == 0 is used in serving ingestion requests where the current file's max event time is
// updated. redoFile != 0 is used in recovery where the redo log file loaded from disk needs to
// get its max event time calculated.
func (r *FileRedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64) {
r.Lock()
defer r.Unlock()
if _, ok := r.MaxEventTimePerFile[redoFile]; ok && eventTime <= r.MaxEventTimePerFile[redoFile] {
return
}
r.MaxEventTimePerFile[redoFile] = eventTime
}
func (r *FileRedoLogManager) closeRedoLogFile(creationTime int64, offset uint32, currentFile *io.ReadCloser,
currentIndex *int, needToTruncate bool) {
// End of file encountered. Move to next file.
if err := (*currentFile).Close(); err != nil {
utils.GetLogger().With(
"table", r.tableName,
"shard", r.shard,
"err", err,
"file", creationTime).Panic("Failed to close redo log file")
}
*currentFile = nil
*currentIndex++
if needToTruncate {
utils.GetLogger().Error("Corrupted file found, truncating it to resume processing")
// truncate current file and move to next file.
if err := r.diskStore.TruncateLogFile(r.tableName, r.shard, creationTime, int64(offset)); err != nil {
utils.GetLogger().With(
"table", r.tableName,
"shard", r.shard,
"err", err,
"file", creationTime).Panic("Failed to truncate redo log file")
}
utils.GetReporter(r.tableName, r.shard).GetCounter(utils.RedoLogFileCorrupt).Inc(1)
}
}
// Iterator returns a functor that can be used to iterate over redo logs on disk and returns
// one UpsertBatch at each call. It returns nil to indicate the end of the upsert batch stream.
//
// Any failure in file reading and upsert batch creation will trigger system panic.
func (r *FileRedoLogManager) Iterator() (NextUpsertFunc, error) {
files, err := r.diskStore.ListLogFiles(r.tableName, r.shard)
if err != nil {
utils.GetLogger().Panic("Failed to list redo log files", err)
}
utils.GetLogger().With(
"table", r.tableName,
"shard", r.shard, "action", "recover",
).Infof("Start replaying local redolog files")
currentIndex := 0
var currentReader utils.StreamDataReader
var currentFile io.ReadCloser
var offset uint32
return func() *NextUpsertBatchInfo {
for {
// Open the next redo file.
if currentFile == nil {
// End of file list, done.
if currentIndex >= len(files) {
r.setRecoveryDone()
return nil
}
key := files[currentIndex]
utils.GetLogger().With(
"table", r.tableName,
"shard", r.shard,
).Infof("Start replaying redo log file %d [%d/%d]", key, currentIndex+1, len(files))
currentFile, err = r.diskStore.OpenLogFileForReplay(r.tableName, r.shard, key)
if err != nil {
utils.GetLogger().Panicf("Failed to open redo log file %v for replay", key)
}
currentReader = utils.NewStreamDataReader(currentFile)
// Read magic header. If magic number mismatches, this means the whole redolog file is corrupted.
// We should immediately crash the server and let engineer to handle this.
var header uint32
if header, err = currentReader.ReadUint32(); err != nil {
utils.GetLogger().Panicf("Failed to read magic header for redo log file %v", key)
}
if header != UpsertHeader {
utils.GetLogger().Panicf("Invalid header %#x for redo log file %v", header, key)
}
offset = 4
}
// All later errors are recoverable and should be solved by truncate the redo log file.
// Try to read the next batch in the file.
size, err := currentReader.ReadUint32()
if err == io.EOF {
r.closeRedoLogFile(files[currentIndex], offset, ¤tFile, ¤tIndex, false)
} else if err != nil {
utils.GetLogger().Errorf("Failed to read size info of the next upsert batch %v",
err)
r.closeRedoLogFile(files[currentIndex], offset, ¤tFile, ¤tIndex, true)
} else {
offset += 4
// Found an upsert batch to read.
buffer := make([]byte, size)
if err := currentReader.Read(buffer); err != nil {
utils.GetLogger().Errorf(
"Failed to read upsert batch of size %v from file %v at offset %v for table %v shard %v",
size, files[currentIndex], offset, r.tableName, r.shard)
r.closeRedoLogFile(files[currentIndex], offset-4, ¤tFile, ¤tIndex, true)
} else {
offset += size
upsertBatch, err := common.NewUpsertBatch(buffer)
if err != nil {
utils.GetLogger().Errorf(
"Failed to create upsert batch from buffer of size %v from file %v at offset %v for table %v shard %v",
size, files[currentIndex], offset, r.tableName, r.shard)
r.closeRedoLogFile(files[currentIndex], offset-4-size, ¤tFile, ¤tIndex, true)
} else {
// update total redolog size
r.TotalRedoLogSize += uint(size + 4)
// increment size per file
r.SizePerFile[files[currentIndex]] += size + 4
r.batchRecovered++
// update lastBatchOffset for the current redo log file
return &NextUpsertBatchInfo{
Batch: upsertBatch,
RedoLogFile: files[currentIndex],
BatchOffset: r.updateBatchCount(files[currentIndex]) - 1,
Recovery: true,
}
}
}
}
}
}, nil
}
func (r *FileRedoLogManager) setRecoveryDone() {
r.Lock()
defer r.Unlock()
r.recoveryChan <- true
r.recoveryDone = true
utils.GetLogger().With("action", "recover", "table", r.tableName, "shard", r.shard,
"batchRecovered", r.batchRecovered).Info("Finished recovery from local redolog files")
}
// updateBatchCount saves/updates batch counts for the given redolog
func (r *FileRedoLogManager) updateBatchCount(redoFile int64) uint32 {
r.RLock()
defer r.RUnlock()
if _, ok := r.BatchCountPerFile[redoFile]; !ok {
r.BatchCountPerFile[redoFile] = 1
} else {
r.BatchCountPerFile[redoFile]++
}
return r.BatchCountPerFile[redoFile]
}
// getRedoLogFilesToPurge returns all redo log files whose max event time is less than cutoff and thus
// is eligible for purging. Readers need to hold the reader lock to access this function.
// At the same, make sure all records should've backfilled successfully
func (r *FileRedoLogManager) getRedoLogFilesToPurge(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) []int64 {
r.RLock()
var creationTimes []int64
for creationTime, maxEventTime := range r.MaxEventTimePerFile {
// exclude current redo file since it's used by ingestion
if (creationTime < r.CurrentFileCreationTime || r.CurrentFileCreationTime == 0) && maxEventTime < cutoff {
if creationTime < redoFileCheckpointed ||
(creationTime == redoFileCheckpointed && r.BatchCountPerFile[redoFileCheckpointed] == batchOffset+1) {
creationTimes = append(creationTimes, creationTime)
}
}
}
r.RUnlock()
return creationTimes
}
// evictRedoLogData evict data belongs to redologs already purged from disk
func (r *FileRedoLogManager) evictRedoLogData(creationTime int64) {
r.Lock()
delete(r.MaxEventTimePerFile, creationTime)
delete(r.BatchCountPerFile, creationTime)
r.TotalRedoLogSize -= uint(r.SizePerFile[creationTime])
delete(r.SizePerFile, creationTime)
utils.GetReporter(r.tableName, r.shard).GetGauge(utils.NumberOfRedologs).Update(float64(len(r.SizePerFile)))
utils.GetReporter(r.tableName, r.shard).GetGauge(utils.SizeOfRedologs).Update(float64(r.TotalRedoLogSize))
r.Unlock()
}
// CheckpointRedolog purges disk files and in memory data of redologs that are eligible to be purged.
func (r *FileRedoLogManager) CheckpointRedolog(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error {
creationTimes := r.getRedoLogFilesToPurge(cutoff, redoFileCheckpointed, batchOffset)
utils.GetLogger().With("action", "purgeRedoLog", "table", r.tableName, "shard", r.shard, "cutoff", cutoff,
"redologfile", redoFileCheckpointed, "batchoffset", batchOffset, "files", len(creationTimes)).Info("CheckpointRedolog")
for _, creationTime := range creationTimes {
if err := r.diskStore.DeleteLogFile(
r.tableName, r.shard, creationTime); err != nil {
return err
}
r.evictRedoLogData(creationTime)
}
return nil
}
func (r *FileRedoLogManager) WaitForRecoveryDone() {
<-r.recoveryChan
}
// CheckpointRedolog purges disk files and in memory data of redologs that are eligible to be purged.
func (r *FileRedoLogManager) GetTotalSize() int {
r.RLock()
defer r.RUnlock()
return int(r.TotalRedoLogSize)
}
func (r *FileRedoLogManager) GetNumFiles() int {
r.RLock()
defer r.RUnlock()
return len(r.SizePerFile)
}
func (r *FileRedoLogManager) GetBatchReceived() int {
return 0
}
func (r *FileRedoLogManager) GetBatchRecovered() int {
return r.batchRecovered
}
// Close closes the current log file.
func (r *FileRedoLogManager) Close() {
if r.currentLogFile != nil {
r.currentLogFile.Close()
}
}
// MarshalJSON marshals a fileRedologManager into json.
func (r *FileRedoLogManager) MarshalJSON() ([]byte, error) {
// Avoid json.Marshal loop calls.
type alias FileRedoLogManager
r.RLock()
defer r.RUnlock()
return json.Marshal((*alias)(r))
}