memstore/snapshot.go (76 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
memCom "github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/utils"
)
//Snapshot is the process to write the current content of dimension table live store in memory to disk in order to
// 1.Facilitate recovery during server bootstrap.
// 2.Purge stale redo logs.
func (m *memStoreImpl) Snapshot(table string, shardID int, reporter SnapshotJobDetailReporter) error {
snapshotTimer := utils.GetReporter(table, shardID).GetTimer(utils.SnapshotTimingTotal)
start := utils.Now()
jobKey := getIdentifier(table, shardID, memCom.SnapshotJobType)
defer func() {
duration := utils.Now().Sub(start)
snapshotTimer.Record(duration)
reporter(jobKey, func(status *SnapshotJobDetail) {
status.LastDuration = duration
})
utils.GetReporter(table, shardID).
GetCounter(utils.SnapshotCount).Inc(1)
}()
shard, err := m.GetTableShard(table, shardID)
if err != nil {
utils.GetLogger().With("table", table, "shard", shardID, "error", err).Warn("Failed to find shard, is it deleted?")
return nil
}
defer shard.Users.Done()
utils.GetLogger().With(
"job", "snapshot",
"table", table).Infof("Creating snapshot")
snapshotMgr := shard.LiveStore.SnapshotManager
// keep the current redofile and offset
redoFile, batchOffset, numMutations, lastReadRecord := snapshotMgr.StartSnapshot()
reporter(jobKey, func(status *SnapshotJobDetail) {
status.RedologFile = redoFile
status.BatchOffset = batchOffset
status.Stage = SnapshotSnapshot
})
if numMutations > 0 {
if err = m.createSnapshot(shard, redoFile, batchOffset); err != nil {
return err
}
}
// checkpoint snapshot progress
snapshotMgr.Done(redoFile, batchOffset, numMutations, lastReadRecord)
reporter(jobKey, func(status *SnapshotJobDetail) {
status.Stage = SnapshotCleanup
})
shard.cleanOldSnapshotAndLogs(redoFile, batchOffset)
reporter(jobKey, func(status *SnapshotJobDetail) {
status.Stage = SnapshotComplete
})
utils.GetLogger().With(
"job", "snapshot",
"table", table).Infof("Snapshot done")
return nil
}
func (m *memStoreImpl) createSnapshot(shard *TableShard, redoFile int64, batchOffset uint32) error {
// Block column deletion
shard.columnDeletion.Lock()
defer shard.columnDeletion.Unlock()
batchIDs, _ := shard.LiveStore.GetBatchIDs()
for _, batchID := range batchIDs {
batch := shard.LiveStore.GetBatchForRead(batchID)
for colID, vp := range batch.Columns {
if vp == nil {
// column deleted likely
continue
}
utils.GetLogger().With(
"job", "snapshot",
"table", shard.Schema.Schema.Name).Infof("batch: %d, columeID: %d", batchID, colID)
serializer := memCom.NewVectorPartySnapshotSerializer(shard.HostMemoryManager, shard.diskStore, shard.Schema.Schema.Name, shard.ShardID,
colID, int(batchID), 0, 0, redoFile, batchOffset)
if err := serializer.WriteVectorParty(vp); err != nil {
batch.RUnlock()
return err
}
}
batch.RUnlock()
}
return nil
}