redolog/file_redolog_manager.go (291 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package redolog import ( "encoding/json" "github.com/uber/aresdb/diskstore" "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/utils" "io" "sync" ) // UpsertHeader is the magic header written into the beginning of each redo log file. const UpsertHeader uint32 = 0xADDAFEED // fileRedologManager manages the redo log file append, rotation, purge. It is used by ingestion, // recovery and archiving. Accessor must hold the TableShard.WriterLock to access it. type FileRedoLogManager struct { // The lock is to protect MaxEventTimePerFile. sync.RWMutex `json:"-"` // The time interval of redo file rotations. RotationInterval int64 `json:"rotationInterval"` // The limit of redo file size to trigger rotations. MaxRedoLogSize int64 `json:"maxRedoLogSize"` // Current redo log size CurrentRedoLogSize uint32 `json:"currentRedoLogSize"` // size of all redologs TotalRedoLogSize uint `json:"totalRedologSize"` // The map with redo log creation time as the key and max event time as the value. Readers // need to hold the reader lock in accessing the field. MaxEventTimePerFile map[int64]uint32 `json:"maxEventTimePerFile"` // redo log creation time -> batch count mapping. // Readers need to hold the reader lock in accessing the field. BatchCountPerFile map[int64]uint32 `json:"batchCountPerFile"` // SizePerFile SizePerFile map[int64]uint32 `json:"sizePerFile"` // Current log file points to the current redo log file used for appending new upsert batches. currentLogFile utils.WriteSyncCloser // Current file creation time in milliseconds. CurrentFileCreationTime int64 `json:"currentFileCreationTime"` // Pointer to the disk store for redo log access. diskStore diskstore.DiskStore // Name of the table. tableName string // The shard id of the table. shard int // used for external blocking check if recovery done recoveryChan chan bool recoveryDone bool // batch recovered counts batchRecovered int } // newFileRedoLogManager creates a new fileRedologManager instance. func newFileRedoLogManager(rotationInterval int64, maxRedoLogSize int64, diskStore diskstore.DiskStore, tableName string, shard int) *FileRedoLogManager { return &FileRedoLogManager{ RotationInterval: rotationInterval, MaxEventTimePerFile: make(map[int64]uint32), BatchCountPerFile: make(map[int64]uint32), SizePerFile: make(map[int64]uint32), diskStore: diskStore, tableName: tableName, shard: shard, MaxRedoLogSize: maxRedoLogSize, CurrentRedoLogSize: 0, recoveryChan: make(chan bool, 1), } } // openFileForWrite handles redo log file opening and rotation (if needed). It guarantees the // validity of the currentLogFile upon return. func (r *FileRedoLogManager) openFileForWrite(upsertBatchSize uint32) { dataTime := utils.Now().Unix() // If current file is still valid we just return the writer back. if r.currentLogFile != nil && dataTime < r.CurrentFileCreationTime+r.RotationInterval && int64(r.CurrentRedoLogSize+upsertBatchSize+4) < r.MaxRedoLogSize { return } var err error if r.currentLogFile != nil { if err = r.currentLogFile.Close(); err != nil { utils.GetLogger().Panic("Failed to close current redo log file") } } if r.currentLogFile, err = r.diskStore.OpenLogFileForAppend(r.tableName, r.shard, dataTime); err != nil { utils.GetLogger().With( "table", r.tableName, "shard", r.shard, "error", err.Error()).Panic("Failed to open new redo log file") } writer := utils.NewStreamDataWriter(r.currentLogFile) if err = writer.WriteUint32(UpsertHeader); err != nil { utils.GetLogger().Panic("Failed to write magic header to the new redo log") } // sync file after writing upsert header if err = r.currentLogFile.Sync(); err != nil { utils.GetLogger().Panic("Failed to sync redolog") } r.CurrentFileCreationTime = dataTime utils.GetReporter(r.tableName, r.shard).GetGauge(utils.CurrentRedologCreationTime).Update(float64(r.CurrentFileCreationTime)) r.MaxEventTimePerFile[r.CurrentFileCreationTime] = 0 r.SizePerFile[r.CurrentFileCreationTime] = 0 utils.GetReporter(r.tableName, r.shard).GetGauge(utils.NumberOfRedologs).Update(float64(len(r.SizePerFile))) r.CurrentRedoLogSize = 4 } // IsAppendEnabled returns whether appending is enabled func (r *FileRedoLogManager) IsAppendEnabled() bool { return true } // AppendToRedoLog saves an upsert batch into disk before applying it. Any errors from diskStore // will trigger system panic. func (r *FileRedoLogManager) AppendToRedoLog(upsertBatch *common.UpsertBatch) (int64, uint32) { r.openFileForWrite(uint32(len(upsertBatch.GetBuffer()))) buffer := upsertBatch.GetBuffer() writer := utils.NewStreamDataWriter(r.currentLogFile) // Write buffer size. if err := writer.WriteUint32(uint32(len(buffer))); err != nil { utils.GetLogger().With("error", err).Panic("Failed to write buffer size into the redo log") } if _, err := r.currentLogFile.Write(buffer); err != nil { utils.GetLogger().With("error", err).Panic("Failed to write upsert buffer into the redo log") } // sync after upsert batch appended to redolog if err := r.currentLogFile.Sync(); err != nil { utils.GetLogger().With("error", err).Panic("Failed to sync write to redo log") } // update current redo log size r.CurrentRedoLogSize += uint32(len(upsertBatch.GetBuffer())) + 4 r.SizePerFile[r.CurrentFileCreationTime] += uint32(len(upsertBatch.GetBuffer())) + 4 r.TotalRedoLogSize += uint(len(upsertBatch.GetBuffer())) + 4 utils.GetReporter(r.tableName, r.shard).GetGauge(utils.CurrentRedologSize).Update(float64(r.CurrentRedoLogSize)) utils.GetReporter(r.tableName, r.shard).GetGauge(utils.SizeOfRedologs).Update(float64(r.TotalRedoLogSize)) // Update offset of the last batch for the current redolog return r.CurrentFileCreationTime, r.updateBatchCount(r.CurrentFileCreationTime) - 1 } // UpdateMaxEventTime updates the max event time of the current redo log file. // redoFile is the key to the corresponding redo file that needs to have the maxEventTime updated. // redoFile == 0 is used in serving ingestion requests where the current file's max event time is // updated. redoFile != 0 is used in recovery where the redo log file loaded from disk needs to // get its max event time calculated. func (r *FileRedoLogManager) UpdateMaxEventTime(eventTime uint32, redoFile int64) { r.Lock() defer r.Unlock() if _, ok := r.MaxEventTimePerFile[redoFile]; ok && eventTime <= r.MaxEventTimePerFile[redoFile] { return } r.MaxEventTimePerFile[redoFile] = eventTime } func (r *FileRedoLogManager) closeRedoLogFile(creationTime int64, offset uint32, currentFile *io.ReadCloser, currentIndex *int, needToTruncate bool) { // End of file encountered. Move to next file. if err := (*currentFile).Close(); err != nil { utils.GetLogger().With( "table", r.tableName, "shard", r.shard, "err", err, "file", creationTime).Panic("Failed to close redo log file") } *currentFile = nil *currentIndex++ if needToTruncate { utils.GetLogger().Error("Corrupted file found, truncating it to resume processing") // truncate current file and move to next file. if err := r.diskStore.TruncateLogFile(r.tableName, r.shard, creationTime, int64(offset)); err != nil { utils.GetLogger().With( "table", r.tableName, "shard", r.shard, "err", err, "file", creationTime).Panic("Failed to truncate redo log file") } utils.GetReporter(r.tableName, r.shard).GetCounter(utils.RedoLogFileCorrupt).Inc(1) } } // Iterator returns a functor that can be used to iterate over redo logs on disk and returns // one UpsertBatch at each call. It returns nil to indicate the end of the upsert batch stream. // // Any failure in file reading and upsert batch creation will trigger system panic. func (r *FileRedoLogManager) Iterator() (NextUpsertFunc, error) { files, err := r.diskStore.ListLogFiles(r.tableName, r.shard) if err != nil { utils.GetLogger().Panic("Failed to list redo log files", err) } utils.GetLogger().With( "table", r.tableName, "shard", r.shard, "action", "recover", ).Infof("Start replaying local redolog files") currentIndex := 0 var currentReader utils.StreamDataReader var currentFile io.ReadCloser var offset uint32 return func() *NextUpsertBatchInfo { for { // Open the next redo file. if currentFile == nil { // End of file list, done. if currentIndex >= len(files) { r.setRecoveryDone() return nil } key := files[currentIndex] utils.GetLogger().With( "table", r.tableName, "shard", r.shard, ).Infof("Start replaying redo log file %d [%d/%d]", key, currentIndex+1, len(files)) currentFile, err = r.diskStore.OpenLogFileForReplay(r.tableName, r.shard, key) if err != nil { utils.GetLogger().Panicf("Failed to open redo log file %v for replay", key) } currentReader = utils.NewStreamDataReader(currentFile) // Read magic header. If magic number mismatches, this means the whole redolog file is corrupted. // We should immediately crash the server and let engineer to handle this. var header uint32 if header, err = currentReader.ReadUint32(); err != nil { utils.GetLogger().Panicf("Failed to read magic header for redo log file %v", key) } if header != UpsertHeader { utils.GetLogger().Panicf("Invalid header %#x for redo log file %v", header, key) } offset = 4 } // All later errors are recoverable and should be solved by truncate the redo log file. // Try to read the next batch in the file. size, err := currentReader.ReadUint32() if err == io.EOF { r.closeRedoLogFile(files[currentIndex], offset, &currentFile, &currentIndex, false) } else if err != nil { utils.GetLogger().Errorf("Failed to read size info of the next upsert batch %v", err) r.closeRedoLogFile(files[currentIndex], offset, &currentFile, &currentIndex, true) } else { offset += 4 // Found an upsert batch to read. buffer := make([]byte, size) if err := currentReader.Read(buffer); err != nil { utils.GetLogger().Errorf( "Failed to read upsert batch of size %v from file %v at offset %v for table %v shard %v", size, files[currentIndex], offset, r.tableName, r.shard) r.closeRedoLogFile(files[currentIndex], offset-4, &currentFile, &currentIndex, true) } else { offset += size upsertBatch, err := common.NewUpsertBatch(buffer) if err != nil { utils.GetLogger().Errorf( "Failed to create upsert batch from buffer of size %v from file %v at offset %v for table %v shard %v", size, files[currentIndex], offset, r.tableName, r.shard) r.closeRedoLogFile(files[currentIndex], offset-4-size, &currentFile, &currentIndex, true) } else { // update total redolog size r.TotalRedoLogSize += uint(size + 4) // increment size per file r.SizePerFile[files[currentIndex]] += size + 4 r.batchRecovered++ // update lastBatchOffset for the current redo log file return &NextUpsertBatchInfo{ Batch: upsertBatch, RedoLogFile: files[currentIndex], BatchOffset: r.updateBatchCount(files[currentIndex]) - 1, Recovery: true, } } } } } }, nil } func (r *FileRedoLogManager) setRecoveryDone() { r.Lock() defer r.Unlock() r.recoveryChan <- true r.recoveryDone = true utils.GetLogger().With("action", "recover", "table", r.tableName, "shard", r.shard, "batchRecovered", r.batchRecovered).Info("Finished recovery from local redolog files") } // updateBatchCount saves/updates batch counts for the given redolog func (r *FileRedoLogManager) updateBatchCount(redoFile int64) uint32 { r.RLock() defer r.RUnlock() if _, ok := r.BatchCountPerFile[redoFile]; !ok { r.BatchCountPerFile[redoFile] = 1 } else { r.BatchCountPerFile[redoFile]++ } return r.BatchCountPerFile[redoFile] } // getRedoLogFilesToPurge returns all redo log files whose max event time is less than cutoff and thus // is eligible for purging. Readers need to hold the reader lock to access this function. // At the same, make sure all records should've backfilled successfully func (r *FileRedoLogManager) getRedoLogFilesToPurge(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) []int64 { r.RLock() var creationTimes []int64 for creationTime, maxEventTime := range r.MaxEventTimePerFile { // exclude current redo file since it's used by ingestion if (creationTime < r.CurrentFileCreationTime || r.CurrentFileCreationTime == 0) && maxEventTime < cutoff { if creationTime < redoFileCheckpointed || (creationTime == redoFileCheckpointed && r.BatchCountPerFile[redoFileCheckpointed] == batchOffset+1) { creationTimes = append(creationTimes, creationTime) } } } r.RUnlock() return creationTimes } // evictRedoLogData evict data belongs to redologs already purged from disk func (r *FileRedoLogManager) evictRedoLogData(creationTime int64) { r.Lock() delete(r.MaxEventTimePerFile, creationTime) delete(r.BatchCountPerFile, creationTime) r.TotalRedoLogSize -= uint(r.SizePerFile[creationTime]) delete(r.SizePerFile, creationTime) utils.GetReporter(r.tableName, r.shard).GetGauge(utils.NumberOfRedologs).Update(float64(len(r.SizePerFile))) utils.GetReporter(r.tableName, r.shard).GetGauge(utils.SizeOfRedologs).Update(float64(r.TotalRedoLogSize)) r.Unlock() } // CheckpointRedolog purges disk files and in memory data of redologs that are eligible to be purged. func (r *FileRedoLogManager) CheckpointRedolog(cutoff uint32, redoFileCheckpointed int64, batchOffset uint32) error { creationTimes := r.getRedoLogFilesToPurge(cutoff, redoFileCheckpointed, batchOffset) utils.GetLogger().With("action", "purgeRedoLog", "table", r.tableName, "shard", r.shard, "cutoff", cutoff, "redologfile", redoFileCheckpointed, "batchoffset", batchOffset, "files", len(creationTimes)).Info("CheckpointRedolog") for _, creationTime := range creationTimes { if err := r.diskStore.DeleteLogFile( r.tableName, r.shard, creationTime); err != nil { return err } r.evictRedoLogData(creationTime) } return nil } func (r *FileRedoLogManager) WaitForRecoveryDone() { <-r.recoveryChan } // CheckpointRedolog purges disk files and in memory data of redologs that are eligible to be purged. func (r *FileRedoLogManager) GetTotalSize() int { r.RLock() defer r.RUnlock() return int(r.TotalRedoLogSize) } func (r *FileRedoLogManager) GetNumFiles() int { r.RLock() defer r.RUnlock() return len(r.SizePerFile) } func (r *FileRedoLogManager) GetBatchReceived() int { return 0 } func (r *FileRedoLogManager) GetBatchRecovered() int { return r.batchRecovered } // Close closes the current log file. func (r *FileRedoLogManager) Close() { if r.currentLogFile != nil { r.currentLogFile.Close() } } // MarshalJSON marshals a fileRedologManager into json. func (r *FileRedoLogManager) MarshalJSON() ([]byte, error) { // Avoid json.Marshal loop calls. type alias FileRedoLogManager r.RLock() defer r.RUnlock() return json.Marshal((*alias)(r)) }