memstore/recovery.go (361 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memstore import ( "github.com/uber/aresdb/cluster/topology" "sync" "math" "sort" memcom "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/utils" ) // PlayRedoLog loads data for the table Shard from disk store and recovers the Shard for serving. func (shard *TableShard) PlayRedoLog() { timer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.RecoveryLatency).Start() defer timer.Stop() utils.GetLogger().With("table", shard.Schema.Schema.Name, "shard", shard.ShardID).Info( "Replay redo logs") var redoLogFilePersisted int64 var offsetPersisted uint32 if backfillMgr := shard.LiveStore.BackfillManager; backfillMgr != nil { redoLogFilePersisted, offsetPersisted = backfillMgr.GetLatestRedoFileAndOffset() } else { redoLogFilePersisted, offsetPersisted, _, _ = shard.LiveStore.SnapshotManager.GetLastSnapshotInfo() } utils.GetLogger().With("table", shard.Schema.Schema.Name, "shard", shard.ShardID, "redoLogFile", redoLogFilePersisted, "offset", offsetPersisted).Info("Checkpointed redolog file") go func() { nextUpsertBatchFunc, err := shard.LiveStore.RedoLogManager.Iterator() if err != nil { panic("Fail to start redolog manager") } for { batchInfo := nextUpsertBatchFunc() if batchInfo == nil { utils.GetLogger().With("table", shard.Schema.Schema.Name, "shard", shard.ShardID).Info("Redolog manager stopped") return } var skipBackfillRows bool if batchInfo.Recovery { // check if this batch has already been backfilled and persisted skipBackfillRows = batchInfo.RedoLogFile < redoLogFilePersisted || (batchInfo.RedoLogFile == redoLogFilePersisted && batchInfo.BatchOffset <= offsetPersisted) } if err = shard.saveUpsertBatch(batchInfo.Batch, batchInfo.RedoLogFile, batchInfo.BatchOffset, batchInfo.Recovery, skipBackfillRows); err != nil { if batchInfo.Recovery { utils.GetLogger().With("error", err).Panic("Failed to apply upsert batch during recovery") } else { // for normal ingestion, will log error and keep going utils.GetLogger().With("action", "ingestion", "table", shard.Schema.Schema.Name, "shard", shard.ShardID, "redologFile", batchInfo.RedoLogFile, "offset", batchInfo.BatchOffset, "error", err).Error("Failed to apply upsert batch") utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetCounter(utils.IngestedErrorBatches).Inc(1) } } } }() shard.LiveStore.RedoLogManager.WaitForRecoveryDone() // report redolog size after replay utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetGauge(utils.NumberOfRedologs).Update(float64(shard.LiveStore.RedoLogManager.GetNumFiles())) utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetGauge(utils.SizeOfRedologs).Update(float64(shard.LiveStore.RedoLogManager.GetTotalSize())) // proactively purge redo files if shard.LiveStore.BackfillManager != nil { shard.LiveStore.RedoLogManager. CheckpointRedolog(shard.LiveStore.ArchivingCutoffHighWatermark, redoLogFilePersisted, offsetPersisted) } } func (shard *TableShard) cleanOldSnapshotAndLogs(redoLogFile int64, offset uint32) { tableName := shard.Schema.Schema.Name // snapshot won't care about the cutoff. if err := shard.LiveStore.RedoLogManager.CheckpointRedolog(math.MaxUint32, redoLogFile, offset); err != nil { utils.GetLogger().With( "job", "snapshot_cleanup", "table", tableName).Errorf( "Purge redologs failed, shard: %d, error: %v", shard.ShardID, err) } if shard.options.bootstrapToken.AcquireToken(tableName, uint32(shard.ShardID)) { defer shard.options.bootstrapToken.ReleaseToken(tableName, uint32(shard.ShardID)) // delete old snapshots if err := shard.diskStore.DeleteSnapshot(shard.Schema.Schema.Name, shard.ShardID, redoLogFile, offset); err != nil { utils.GetLogger().With( "job", "snapshot_cleanup", "table", tableName).Errorf( "Delete snapshots failed, shard: %d, error: %v", shard.ShardID, err) } } } // LoadMetaData loads metadata for the table Shard from metastore. func (shard *TableShard) LoadMetaData() error { if shard.Schema.Schema.IsFactTable { cutoff, err := shard.metaStore.GetArchivingCutoff(shard.Schema.Schema.Name, shard.ShardID) if err != nil { return err } shard.ArchiveStore.CurrentVersion = NewArchiveStoreVersion(cutoff, shard) // We set the archiving cutoff to the persisted value (CLW) in meta so recovery will apply // all items in redolog that have event time > CLW. The backfill job will ignore items in // the backfill that have associated CHW (cutoff high watermark) > persisted CHW. shard.LiveStore.ArchivingCutoffHighWatermark = cutoff shard.LiveStore.PrimaryKey.UpdateEventTimeCutoff(cutoff) // retrieve redoLog/offset checkpointed for backfill redoLog, offset, err := shard.metaStore.GetBackfillProgressInfo(shard.Schema.Schema.Name, shard.ShardID) if err != nil { return err } shard.LiveStore.BackfillManager.LastRedoFile = redoLog shard.LiveStore.BackfillManager.LastBatchOffset = offset } else { redoLogFile, offset, batchID, lastRecord, err := shard.metaStore.GetSnapshotProgress(shard.Schema.Schema.Name, shard.ShardID) if err != nil { return err } // retrieve latest snapshot info record := memcom.RecordID{BatchID: batchID, Index: lastRecord} shard.LiveStore.SnapshotManager.SetLastSnapshotInfo(redoLogFile, offset, record) } return nil } // loadSnapshots load snapshots for dimension tables func (m *memStoreImpl) loadSnapshots() { utils.GetLogger().Info("Start loading snapshots for all table shards") var wg sync.WaitGroup for table, tableSchema := range m.TableSchemas { if tableSchema.Schema.IsFactTable { continue } wg.Add(1) go func(tableName string) { tableShards := m.TableShards[tableName] for _, shard := range tableShards { utils.GetLogger().With( "job", "snapshot_load", "table", shard.Schema.Schema.Name, "shard", shard.ShardID). Info("Loading snapshots") if err := shard.LoadSnapshot(); err != nil { utils.GetLogger().With( "job", "snapshot_load", "table", shard.Schema.Schema.Name, "shard", shard.ShardID).Panic(err) } utils.GetLogger().With( "job", "snapshot_load", "table", shard.Schema.Schema.Name, "shard", shard.ShardID). Info("Loading snapshots done") } wg.Done() }(table) } wg.Wait() utils.GetLogger().Info("Finish loading snapshots for all table shards") } // playRedoLogs replay redo logs for all tables in parallel, and then start the data ingestion func (m *memStoreImpl) playRedoLogs() { utils.GetLogger().Info("Start replaying redo logs for all table shards") var wg sync.WaitGroup for table := range m.TableSchemas { wg.Add(1) go func(tableName string) { tableShards := m.TableShards[tableName] // Replay all redologs for _, shard := range tableShards { utils.GetLogger().With( "job", "replay_redo_logs", "table", shard.Schema.Schema.Name, "shard", shard.ShardID). Info("Replaying redo logs") shard.PlayRedoLog() utils.GetLogger().With( "job", "replay_redo_logs", "table", shard.Schema.Schema.Name, "shard", shard.ShardID). Info("Replaying redo logs done") } wg.Done() }(table) } wg.Wait() utils.GetLogger().Info("Finish replaying redo logs for all table shards") } // InitShards loads/recovers data for shards initially owned by the current instance. // It also watches Shard ownership change events and handles them in a separate goroutine. // InitShards is only used in non sharded version which assume totalShardsInCluster to be one func (m *memStoreImpl) InitShards(schedulerOff bool, shardOwner topology.ShardOwner) { for _, schema := range m.TableSchemas { shards := shardOwner.GetOwnedShards() for _, shard := range shards { if err := m.LoadShard(schema, shard, false); err != nil { utils.GetLogger().Panic(err) } } } // tryPreload data according the column retention config and start the go routines // to do eviction and preloading. m.preloadAllFactTables() // Start host memory manager m.HostMemManager.Start() // load snapshot for dimension tables m.loadSnapshots() // start scheduler after we load all the metadata. This ensure we can start backfill job earlier to consume // the backfill queue. if !schedulerOff { // Start scheduler. utils.GetLogger().Info("Starting archiving scheduler") // disable archiving during redolog replay m.GetScheduler().EnableJobType(memcom.ArchivingJobType, false) // this will start scheduler of all jobs except archiving, archiving will be started individually m.GetScheduler().Start() } else { utils.GetLogger().Info("Scheduler is off") } m.playRedoLogs() if !schedulerOff { // re-enable archiving after redolog replay m.GetScheduler().EnableJobType(memcom.ArchivingJobType, true) } // watch Shard ownership change shardOwnershipChangeEvents, done, err := m.metaStore.WatchShardOwnershipEvents() if err != nil { utils.GetLogger().Panic(utils.StackError(err, "Failed to watch Shard ownership change")) } // Shard ownership change handling go func() { for event := range shardOwnershipChangeEvents { if event.ShouldOwn { m.RLock() schema := m.TableSchemas[event.TableName] m.RUnlock() if schema == nil { utils.GetLogger().Panic(utils.StackError(nil, "Trying to load Shard %d of unknown table %s", event.Shard, event.TableName)) } // This assumes that (certain) schema change must wait until the Shard // is fully loaded, which may take a while. err := m.LoadShard(schema, event.Shard, true) if err != nil { utils.GetLogger().Panic(err) } } else { // Unload the Shard. var shard *TableShard // Detach first. m.Lock() shards := m.TableShards[event.TableName] if shards != nil { shard = shards[event.Shard] delete(shards, event.Shard) utils.DeleteTableShardReporter(event.TableName, event.Shard) } m.Unlock() // Destruct. if shard != nil { shard.Destruct() } // Do not delete the file on diskstore. } done <- struct{}{} } close(done) }() } // LoadShard loads/recovers the specified Shard and attaches it to memStoreImpl for serving. If will load the metadata // first and then replay redologs only if replayRedologs is true. // LoadShard is only used in non sharded version, whihch assume totalShardsInCluster to be 1 func (m *memStoreImpl) LoadShard(schema *memcom.TableSchema, shard int, replayRedologs bool) error { tableShard := NewTableShard(schema, m.metaStore, m.diskStore, m.HostMemManager, shard, 1, m.options) err := tableShard.LoadMetaData() if err != nil { utils.GetLogger().Panic(err) } if replayRedologs { tableShard.PlayRedoLog() } m.Lock() defer m.Unlock() shardMap := m.TableShards[schema.Schema.Name] if shardMap == nil { shardMap = make(map[int]*TableShard) m.TableShards[schema.Schema.Name] = shardMap } if shardMap[shard] != nil { return utils.StackError(nil, "Shard %d of Table %s has already been loaded", shard, schema.Schema.Name) } shardMap[shard] = tableShard // Add reporter for current table and Shard. utils.AddTableShardReporter(schema.Schema.Name, shard) return nil } // LoadSnapshot load shard data from snapshot files func (shard *TableShard) LoadSnapshot() error { loadTimer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.SnapshotTimingLoad) start := utils.Now() defer func() { duration := utils.Now().Sub(start) loadTimer.Record(duration) }() redoLogFile, offset, _, lastReadRecord := shard.LiveStore.SnapshotManager.GetLastSnapshotInfo() if redoLogFile <= 0 { // no snapshot created yet return nil } tableName := shard.Schema.Schema.Name shardID := shard.ShardID utils.GetLogger().With( "job", "snapshot_load", "table", tableName, "shard", shardID).Info("Load data from snapshot") var batchIDs []int var err error if batchIDs, err = shard.diskStore.ListSnapshotBatches(tableName, shardID, redoLogFile, offset); err != nil { return err } else if len(batchIDs) == 0 { return utils.StackError(nil, "No snapshot file/directory found") } shard.LiveStore.WriterLock.Lock() defer shard.LiveStore.WriterLock.Unlock() for _, id := range batchIDs { batchID := int32(id) // find all columns in snapshot dir batchPos, err := shard.loadTableShardSnapshot(tableName, shardID, batchID, redoLogFile, offset) if err != nil { return err } if batchID == lastReadRecord.BatchID { batchPos = lastReadRecord.Index } shard.rebuildIndexForLiveStore(batchID, batchPos) } //reset back the read/write record position shard.LiveStore.Lock() shard.LiveStore.LastReadRecord = lastReadRecord shard.LiveStore.Unlock() shard.LiveStore.NextWriteRecord = lastReadRecord return nil } func (shard *TableShard) loadTableShardSnapshot( tableName string, shardID int, batchID int32, redoLogFile int64, offset uint32) (uint32, error) { shard.Schema.RLock() dataTypes := shard.Schema.ValueTypeByColumn defaultValues := shard.Schema.DefaultValues columns := shard.Schema.Schema.Columns shard.Schema.RUnlock() var err error var cols []int // find all columns in snapshot dir if cols, err = shard.diskStore.ListSnapshotVectorPartyFiles(tableName, shardID, redoLogFile, offset, int(batchID)); err != nil { return 0, err } var vp memcom.LiveVectorParty batch := shard.LiveStore.getOrCreateBatch(int32(batchID)) defer batch.Unlock() for colID, column := range columns { utils.GetLogger().With( "job", "snapshot_load", "table", shard.Schema.Schema.Name, "shard", shardID, "batch", batchID, "column", colID).Info("Load snapshot column") index := sort.SearchInts(cols, colID) existing := index >= 0 && index < len(cols) && cols[index] == colID if column.Deleted || !existing { vp = nil } else { // found the column in snapshot, read from snapshot file vp = NewLiveVectorParty(batch.Capacity, dataTypes[colID], *defaultValues[colID], shard.HostMemoryManager) serializer := memcom.NewVectorPartySnapshotSerializer(shard.HostMemoryManager, shard.diskStore, shard.Schema.Schema.Name, shard.ShardID, colID, int(batchID), 0, 0, redoLogFile, offset) if err := serializer.ReadVectorParty(vp); err != nil { return 0, err } } batch.Columns[colID] = vp } return uint32(batch.Capacity - 1), nil } func (shard *TableShard) rebuildIndexForLiveStore(batchID int32, lastRecord uint32) error { buildIndexTimer := utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.SnapshotTimingBuildIndex) start := utils.Now() defer func() { duration := utils.Now().Sub(start) buildIndexTimer.Record(duration) }() utils.GetLogger().With( "job", "snapshot_load", "table", shard.Schema.Schema.Name).Info("Rebuilding index") batch := shard.LiveStore.Batches[batchID] primaryKeyBytes := shard.Schema.PrimaryKeyBytes primaryKeyColumns := shard.Schema.GetPrimaryKeyColumns() key := make([]byte, primaryKeyBytes) var err error var row uint32 for row = 0; row <= lastRecord; row++ { // truncate key before every read key = key[:0] if key, err = memcom.AppendPrimaryKeyBytes(key, memcom.NewPrimaryKeyDataValueIterator(batch, int(row), primaryKeyColumns)); err != nil { return err } recordID := memcom.RecordID{ BatchID: batchID, Index: uint32(row), } found, _, err := shard.LiveStore.PrimaryKey.FindOrInsert(key, recordID, 0) if err != nil { return err } else if found { return utils.StackError(nil, "Duplicate primary key found during rebuild index") } } return nil }