collector/ckpt/ckpt_manager.go (115 lines of code) (raw):
package ckpt
import (
"errors"
"fmt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"sync"
)
const (
CheckpointName = "name"
)
type CheckpointManager struct {
Type string
ctx *CheckpointContext
ctxRecLock sync.Mutex
ctxRec *CheckpointContext // only used to store temporary value that will be lazy load
delegate CheckpointOperation
}
func NewCheckpointManager(name string, startPosition int64) *CheckpointManager {
newManager := &CheckpointManager{}
switch conf.Options.CheckpointStorage {
case utils.VarCheckpointStorageApi:
newManager.delegate = &HttpApiCheckpoint{
CheckpointContext: CheckpointContext{
Name: name,
Timestamp: startPosition,
Version: utils.FcvCheckpoint.CurrentVersion,
OplogDiskQueue: "",
OplogDiskQueueFinishTs: InitCheckpoint,
},
URL: conf.Options.CheckpointStorageCollection,
}
case utils.VarCheckpointStorageDatabase:
newManager.delegate = &MongoCheckpoint{
CheckpointContext: CheckpointContext{
Name: name,
Timestamp: startPosition,
Version: utils.FcvCheckpoint.CurrentVersion,
OplogDiskQueue: "",
OplogDiskQueueFinishTs: InitCheckpoint,
},
DB: conf.Options.CheckpointStorageDb,
URL: conf.Options.CheckpointStorageUrl,
Table: conf.Options.CheckpointStorageCollection,
}
default:
return nil
}
return newManager
}
// get persist checkpoint
func (manager *CheckpointManager) Get() (*CheckpointContext, bool, error) {
var exist bool
manager.ctx, exist = manager.delegate.Get()
if manager.ctx == nil {
return nil, exist, fmt.Errorf("get by checkpoint info from db or api failed, please see err log")
}
// check fcv
if exist && utils.FcvCheckpoint.IsCompatible(manager.ctx.Version) == false {
return nil, exist, fmt.Errorf("current required checkpoint version[%v] > input[%v], please upgrade MongoShake to version >= %v",
utils.FcvCheckpoint.CurrentVersion, manager.ctx.Version,
utils.LowestCheckpointVersion[utils.FcvCheckpoint.CurrentVersion])
}
return manager.ctx, exist, nil
}
// get in memory checkpoint
func (manager *CheckpointManager) GetInMemory() *CheckpointContext {
return manager.ctx
}
// Update checkpoint update memory & persistence(db or file)
func (manager *CheckpointManager) Update(ts int64) error {
if manager.ctx == nil || len(manager.ctx.Name) == 0 {
// must run Get() first
return errors.New("current ckpt context is empty")
}
manager.ctx.Timestamp = ts
manager.ctx.Version = utils.FcvCheckpoint.CurrentVersion
// update OplogDiskQueueFinishTs if set
if manager.ctxRec != nil {
if manager.ctx.OplogDiskQueueFinishTs != manager.ctxRec.OplogDiskQueueFinishTs {
manager.ctx.OplogDiskQueueFinishTs = manager.ctxRec.OplogDiskQueueFinishTs
}
if manager.ctx.OplogDiskQueue != manager.ctxRec.OplogDiskQueue {
manager.ctx.OplogDiskQueue = manager.ctxRec.OplogDiskQueue
}
if manager.ctx.FetchMethod != manager.ctxRec.FetchMethod {
manager.ctx.FetchMethod = manager.ctxRec.FetchMethod
}
}
return manager.delegate.Insert(manager.ctx)
}
// OplogDiskQueueFinishTs and OplogDiskQueue won't immediate effect, will be inserted in the next Update call.
func (manager *CheckpointManager) SetOplogDiskFinishTs(ts int64) {
if manager.ctxRec == nil {
manager.ctxRecLock.Lock()
if manager.ctxRec == nil { // double check
manager.ctxRec = new(CheckpointContext)
}
manager.ctxRecLock.Unlock()
}
manager.ctxRec.OplogDiskQueueFinishTs = ts
}
func (manager *CheckpointManager) SetOplogDiskQueueName(name string) {
if manager.ctxRec == nil {
manager.ctxRecLock.Lock()
if manager.ctxRec == nil { // double check
manager.ctxRec = new(CheckpointContext)
}
manager.ctxRecLock.Unlock()
}
manager.ctxRec.OplogDiskQueue = name
}
func (manager *CheckpointManager) SetFetchMethod(method string) {
if manager.ctxRec == nil {
manager.ctxRecLock.Lock()
if manager.ctxRec == nil { // double check
manager.ctxRec = new(CheckpointContext)
}
manager.ctxRecLock.Unlock()
}
manager.ctxRec.FetchMethod = method
}