collector/syncer.go (514 lines of code) (raw):
package collector
import (
"fmt"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
"github.com/alibaba/MongoShake/v2/collector/ckpt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/collector/filter"
sourceReader "github.com/alibaba/MongoShake/v2/collector/reader"
utils "github.com/alibaba/MongoShake/v2/common"
"github.com/alibaba/MongoShake/v2/oplog"
"github.com/alibaba/MongoShake/v2/quorum"
"strings"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson"
)
const (
// FetcherBufferCapacity = 256
// AdaptiveBatchingMaxSize = 16384 // 16k
// bson deserialize workload is CPU-intensive task
PipelineQueueMaxNr = 8
PipelineQueueMiddleNr = 4
PipelineQueueMinNr = 1
PipelineQueueLen = 64 * 2
DurationTime = 6000 // unit: ms.
DDLCheckpointInterval = 300 // unit: ms.
FilterCheckpointGap = 180 // unit: seconds. no checkpoint update, flush checkpoint mandatory
FilterCheckpointCheckInterval = 180 // unit: seconds.
CheckCheckpointUpdateTimes = 10 // at most times of time check
)
type OplogHandler interface {
// invocation on every oplog consumed
Handle(log *oplog.PartialLog)
}
// OplogSyncer poll oplogs from original source MongoDB.
type OplogSyncer struct {
OplogHandler
// source mongodb replica set name
Replset string
// oplog start position of source mongodb
startPosition interface{}
// full sync finish position, used to check DDL between full sync and incr sync
fullSyncFinishPosition primitive.Timestamp
ckptManager *ckpt.CheckpointManager
// oplog hash strategy
hasher oplog.Hasher
// pending queue. used by raw log parsing. we buffered the
// target raw oplogs in buffer and push them to pending queue
// when buffer is filled in. and transfer to log queue
// buffer []*bson.Raw // move to persister
PendingQueue []chan [][]byte
logsQueue []chan []*oplog.GenericOplog
LastFetchTs primitive.Timestamp // the previous last fetch timestamp
// nextQueuePosition uint64 // move to persister
// source mongo oplog/event reader
reader sourceReader.Reader
// journal log that records all oplogs
journal *utils.Journal
// oplogs dispatcher
batcher *Batcher
// data persist handler
persister *Persister
// qos
qos *utils.Qos
// timers for inner event
startTime time.Time
ckptTime time.Time
replMetric *utils.ReplicationMetric
// can be closed
CanClose bool
SyncGroup []*OplogSyncer
shutdownWorking bool // shutdown routine starts?
}
/*
* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as
* a network sender. There are several syncer coexist to improve the fetching performance.
* The data flow in syncer is:
* source mongodb --> reader --> persister --> pending queue(raw data) --> logs queue(parsed data) --> worker
* The reason we split pending queue and logs queue is to improve the performance.
*/
func NewOplogSyncer(
replset string,
startPosition interface{},
fullSyncFinishPosition int64,
mongoUrl string,
gids []string) *OplogSyncer {
reader, err := sourceReader.CreateReader(conf.Options.IncrSyncMongoFetchMethod, mongoUrl, replset)
if err != nil {
LOG.Critical("create reader with url[%v] replset[%v] failed[%v]", mongoUrl, replset, err)
return nil
}
syncer := &OplogSyncer{
Replset: replset,
startPosition: startPosition,
fullSyncFinishPosition: utils.Int64ToTimestamp(fullSyncFinishPosition),
journal: utils.NewJournal(utils.JournalFileName(
fmt.Sprintf("%s.%s", conf.Options.Id, replset))),
reader: reader,
qos: utils.StartQoS(0, 1, &utils.IncrSentinelOptions.TPS), // default is 0 which means do not limit
}
// concurrent level hasher
switch conf.Options.IncrSyncShardKey {
case oplog.ShardByNamespace:
syncer.hasher = &oplog.TableHasher{}
case oplog.ShardByID:
syncer.hasher = &oplog.PrimaryKeyHasher{}
}
if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 {
syncer.hasher = oplog.NewWhiteListObjectIdHasher(conf.Options.IncrSyncShardByObjectIdWhiteList)
}
filterList := filter.OplogFilterChain{new(filter.AutologousFilter), new(filter.NoopFilter), filter.NewGidFilter(gids)}
// namespace filter, heavy operation
if len(conf.Options.FilterNamespaceWhite) != 0 || len(conf.Options.FilterNamespaceBlack) != 0 {
namespaceFilter := filter.NewNamespaceFilter(conf.Options.FilterNamespaceWhite,
conf.Options.FilterNamespaceBlack)
filterList = append(filterList, namespaceFilter)
}
// oplog filters. drop the oplog if any of the filter
// list returns true. The order of all filters is not significant.
// workerGroup is assigned later by syncer.bind()
syncer.batcher = NewBatcher(syncer, filterList, syncer, []*Worker{})
// init persist
syncer.persister = NewPersister(replset, syncer)
return syncer
}
func (sync *OplogSyncer) Init() {
var options uint64 = utils.METRIC_CKPT_TIMES | utils.METRIC_LSN | utils.METRIC_SUCCESS |
utils.METRIC_TPS | utils.METRIC_FILTER
if conf.Options.Tunnel != utils.VarTunnelDirect {
options |= utils.METRIC_RETRANSIMISSION
options |= utils.METRIC_TUNNEL_TRAFFIC
options |= utils.METRIC_WORKER
}
sync.replMetric = utils.NewMetric(sync.Replset, utils.TypeIncr, options)
sync.replMetric.ReplStatus.Update(utils.WorkGood)
sync.RestAPI()
sync.persister.RestAPI()
}
func (sync *OplogSyncer) Fini() {
sync.batcher.Fini()
}
func (sync *OplogSyncer) String() string {
return fmt.Sprintf("Syncer[%s]", sync.Replset)
}
// bind different worker
func (sync *OplogSyncer) Bind(w *Worker) {
sync.batcher.workerGroup = append(sync.batcher.workerGroup, w)
}
func (sync *OplogSyncer) StartDiskApply() {
sync.persister.SetFetchStage(utils.FetchStageStoreDiskApply)
}
// start to polling oplog
func (sync *OplogSyncer) Start() {
LOG.Info("%s poll oplog syncer start. ckpt_interval[%dms], gid[%s], shard_key[%s]",
sync, conf.Options.CheckpointInterval, conf.Options.IncrSyncOplogGIDS, conf.Options.IncrSyncShardKey)
sync.startTime = time.Now()
// start persister
sync.persister.Start()
// TODO, need handle PBRT
// process about the checkpoint :
//
// 1. create checkpoint manager
// 2. load existing ckpt from remote storage
// 3. start checkpoint persist routine
sync.newCheckpointManager(sync.Replset, sync.startPosition)
if _, ok := sync.startPosition.(int64); !ok {
// set resumeToken for aliyun_serverless
sync.reader.SetQueryTimestampOnEmpty(sync.startPosition)
}
// load checkpoint and set stage
if err := sync.loadCheckpoint(); err != nil {
LOG.Crash(err)
}
// start deserializer: parse data from pending queue, and then push into logs queue.
sync.startDeserializer()
// start batcher: pull oplog from logs queue and then batch together before adding into worker.
sync.startBatcher()
// forever fetching oplog from mongodb into oplog_reader
for {
sync.poll()
// error or exception occur
LOG.Warn("%s polling yield. master:%t, yield:%dms", sync, quorum.IsMaster(), DurationTime)
utils.YieldInMs(DurationTime)
}
}
// fetch all oplog from logs queue, batched together and then send to different workers.
func (sync *OplogSyncer) startBatcher() {
var batcher = sync.batcher
filterCheckTs := time.Now()
filterFlag := false // marks whether previous log is filter
nimo.GoRoutineInLoop(func() {
/*
* judge self is master?
*/
if !quorum.IsMaster() {
utils.YieldInMs(DurationTime)
return
}
// As much as we can batch more from logs queue. batcher can merge
// a sort of oplogs from different logs queue one by one. the max number
// of oplogs in batch is limited by AdaptiveBatchingMaxSize
batchedOplog, barrier, allEmpty, exit := batcher.BatchMore()
// it's better to handle filter in BatchMore function, but I don't want to touch this file anymore
if conf.Options.FilterOplogGids {
if err := sync.filterOplogGid(batchedOplog); err != nil {
LOG.Crash("%v", err)
}
}
var newestTs int64
if exit {
LOG.Info("%s find exit signal", sync)
// should exit now, make sure the checkpoint is updated before that
lastLog, lastFilterLog := batcher.getLastOplog()
newestTs = 1 // default is 1
if lastLog != nil && utils.TimeStampToInt64(lastLog.Timestamp) > newestTs {
newestTs = utils.TimeStampToInt64(lastLog.Timestamp)
} else if newestTs == 1 && lastFilterLog != nil {
// only set to the lastFilterLog timestamp if all before oplog filtered.
newestTs = utils.TimeStampToInt64(lastFilterLog.Timestamp)
}
if lastLog != nil && !allEmpty {
// push to worker
if worked := batcher.dispatchBatches(batchedOplog); worked {
sync.replMetric.SetLSN(newestTs)
// update latest fetched timestamp in memory
sync.reader.UpdateQueryTimestamp(newestTs)
}
}
// flush checkpoint value
sync.checkpoint(true, 0)
sync.checkCheckpointUpdate(true, newestTs) // check if need
sync.CanClose = true
LOG.Info("%s blocking and waiting exits, checkpoint: %v", sync, utils.ExtractTimestampForLog(newestTs))
select {} // block forever, wait outer routine exits
} else if log, filterLog := batcher.getLastOplog(); log != nil && !allEmpty {
// if all filtered, still update checkpoint
newestTs = utils.TimeStampToInt64(log.Timestamp)
// push to worker
if worked := batcher.dispatchBatches(batchedOplog); worked {
sync.replMetric.SetLSN(newestTs)
// update latest fetched timestamp in memory
sync.reader.UpdateQueryTimestamp(newestTs)
}
filterFlag = false
// flush checkpoint value
sync.checkpoint(barrier, 0)
sync.checkCheckpointUpdate(barrier, newestTs) // check if need
} else {
// if log is nil, check whether filterLog is empty
if filterLog == nil {
// no need to update
LOG.Debug("%s filterLog is nil", sync)
return
} else if utils.TimeStampToInt64(filterLog.Timestamp) <= sync.ckptManager.GetInMemory().Timestamp {
// no need to update
LOG.Debug("%s filterLogTs[%v] is small than ckptTs[%v], skip this filterLogTs", sync,
filterLog.Timestamp, utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp))
return
} else {
now := time.Now()
// return if filterFlag == false
if filterFlag == false {
filterFlag = true
filterCheckTs = now
return
}
// pass only if all received oplog are filtered for {FilterCheckpointCheckInterval} seconds.
if now.After(filterCheckTs.Add(FilterCheckpointCheckInterval*time.Second)) == false {
return
}
checkpointTs := utils.ExtractMongoTimestamp(sync.ckptManager.GetInMemory().Timestamp)
filterNewestTs := utils.ExtractMongoTimestamp(filterLog.Timestamp)
if filterNewestTs-FilterCheckpointGap > checkpointTs {
// if checkpoint has not been update for {FilterCheckpointGap} seconds, update
// checkpoint mandatory.
newestTs = utils.TimeStampToInt64(filterLog.Timestamp)
LOG.Info("%s try to update checkpoint mandatory from %v to %v", sync,
utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp),
filterLog.Timestamp)
} else {
LOG.Debug("%s filterLogTs[%v] not bigger than checkpoint[%v]",
sync, filterLog.Timestamp,
utils.ExtractTimestampForLog(sync.ckptManager.GetInMemory().Timestamp))
return
}
}
filterFlag = false
if log != nil {
newestTsLog := utils.ExtractTimestampForLog(newestTs)
if newestTs < utils.TimeStampToInt64(log.Timestamp) {
LOG.Error("%s filter newestTs[%v] smaller than previous timestamp[%v]",
sync, newestTsLog, log.Timestamp)
}
LOG.Info("%s waiting last checkpoint[%v] updated", sync, newestTsLog)
// check last checkpoint updated
status := sync.checkCheckpointUpdate(true, utils.TimeStampToInt64(log.Timestamp))
LOG.Info("%s last checkpoint[%v] updated [%v]", sync, newestTsLog, status)
} else {
LOG.Info("%s last log is empty, skip waiting checkpoint updated", sync)
}
// update latest fetched timestamp in memory
sync.reader.UpdateQueryTimestamp(newestTs)
// flush checkpoint by the newest filter oplog value
sync.checkpoint(false, newestTs)
return
}
})
}
// wait for checkpoint reach newestTs which mean oplog is written to dest db when barrier is true, maxtime is 3 second
func (sync *OplogSyncer) checkCheckpointUpdate(barrier bool, newestTs int64) bool {
// if barrier == true, we should check whether the checkpoint is updated to `newestTs`.
if barrier && newestTs > 0 {
LOG.Info("%s find barrier", sync)
var checkpointTs int64
for i := 0; i < CheckCheckpointUpdateTimes; i++ {
// checkpointTs := sync.ckptManager.GetInMemory().Timestamp
checkpoint, _, err := sync.ckptManager.Get()
if err != nil {
LOG.Error("%s get remote checkpoint failed: %v", sync, err)
utils.YieldInMs(DDLCheckpointInterval * 3)
continue
}
checkpointTs = checkpoint.Timestamp
LOG.Info("%s compare remote checkpoint[%v] to local newestTs[%v]", sync,
utils.ExtractTimestampForLog(checkpointTs), utils.ExtractTimestampForLog(newestTs))
if checkpointTs >= newestTs {
LOG.Info("%s barrier checkpoint has updated to newest[%v]", sync, utils.ExtractTimestampForLog(newestTs))
return true
}
utils.YieldInMs(DDLCheckpointInterval)
// re-flush
sync.checkpoint(true, 0)
}
/*
* if code hits here, it means the checkpoint has not been updated(usually DDL).
* it's ok because the checkpoint can still forward on the next time.
* However, if MongoShake crashes here and restarts, there maybe a conflict when the
* oplog is DDL that has been applied but checkpoint not updated.
*/
LOG.Warn("check checkpoint[%v] update[%v] failed, but do worry",
utils.ExtractTimestampForLog(checkpointTs), utils.ExtractTimestampForLog(newestTs))
}
return false
}
/********************************deserializer begin**********************************/
// deserializer: pending_queue -> logs_queue
// how many pending queue we create
func calculatePendingQueueConcurrency() int {
// single {pending|logs}queue while it'is multi source shard
// need more thread when fetching method is change stream, no matter replica or sharding.
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
return PipelineQueueMaxNr
}
if conf.Options.IsShardCluster() {
return PipelineQueueMiddleNr
}
return PipelineQueueMaxNr
}
// deserializer: fetch oplog from pending queue, parsed and then add into logs queue.
func (sync *OplogSyncer) startDeserializer() {
parallel := calculatePendingQueueConcurrency()
sync.PendingQueue = make([]chan [][]byte, parallel, parallel)
sync.logsQueue = make([]chan []*oplog.GenericOplog, parallel, parallel)
for index := 0; index != len(sync.PendingQueue); index++ {
sync.PendingQueue[index] = make(chan [][]byte, PipelineQueueLen)
sync.logsQueue[index] = make(chan []*oplog.GenericOplog, PipelineQueueLen)
go sync.deserializer(index)
}
}
func (sync *OplogSyncer) deserializer(index int) {
// parser is used to parse the raw []byte
var parser func(input []byte) (*oplog.PartialLog, error)
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream {
// parse []byte (change stream event format) -> oplog
parser = func(input []byte) (*oplog.PartialLog, error) {
return oplog.ConvertEvent2Oplog(input, conf.Options.IncrSyncChangeStreamWatchFullDocument)
}
} else {
// parse []byte (oplog format) -> oplog
parser = func(input []byte) (*oplog.PartialLog, error) {
log := oplog.ParsedLog{}
err := bson.Unmarshal(input, &log)
return &oplog.PartialLog{
ParsedLog: log,
}, err
}
}
// combiner is used to combine data and send to downstream
var combiner func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog
// change stream && !direct && !(kafka & json)
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream &&
conf.Options.Tunnel != utils.VarTunnelDirect &&
!(conf.Options.Tunnel == utils.VarTunnelKafka &&
conf.Options.TunnelMessage == utils.VarTunnelMessageJson) {
// very time consuming!
combiner = func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog {
if out, err := bson.Marshal(log.ParsedLog); err != nil {
LOG.Crashf("%s deserializer marshal[%v] failed: %v", sync, log.ParsedLog, err)
return nil
} else {
return &oplog.GenericOplog{Raw: out, Parsed: log}
}
}
} else {
combiner = func(raw []byte, log *oplog.PartialLog) *oplog.GenericOplog {
return &oplog.GenericOplog{Raw: raw, Parsed: log}
}
}
// run
for {
batchRawLogs := <-sync.PendingQueue[index]
nimo.AssertTrue(len(batchRawLogs) != 0, "pending queue batch logs has zero length")
var deserializeLogs = make([]*oplog.GenericOplog, 0, len(batchRawLogs))
for _, rawLog := range batchRawLogs {
log, err := parser(rawLog)
if err != nil {
LOG.Crashf("%s deserializer parse data failed[%v]", sync, err)
}
log.RawSize = len(rawLog)
deserializeLogs = append(deserializeLogs, combiner(rawLog, log))
}
// set the fetch timestamp
if len(deserializeLogs) > 0 {
sync.LastFetchTs = deserializeLogs[0].Parsed.Timestamp
}
sync.logsQueue[index] <- deserializeLogs
}
}
/********************************deserializer end**********************************/
// only master(maybe several mongo-shake start) can poll oplog.
func (sync *OplogSyncer) poll() {
// we should reload checkpoint. in case of other collector
// has fetched oplogs when master quorum leader election
// happens frequently. so we simply reload.
checkpoint, _, err := sync.ckptManager.Get()
if err != nil {
// we doesn't continue working on ckpt fetched failed. because we should
// confirm the exist checkpoint value or exactly knows that it doesn't exist
LOG.Critical("%s Acquire the existing checkpoint from remote[%s %s.%s] failed !", sync,
conf.Options.CheckpointStorage, conf.Options.CheckpointStorageDb,
conf.Options.CheckpointStorageCollection)
return
}
sync.reader.SetQueryTimestampOnEmpty(checkpoint.Timestamp)
sync.reader.StartFetcher() // start reader fetcher if not exist
for quorum.IsMaster() {
// limit the qps if enabled
if sync.qos.Limit > 0 {
sync.qos.FetchBucket()
}
// check shutdown
sync.checkShutdown()
// only get one
sync.next()
}
}
// fetch oplog from reader.
func (sync *OplogSyncer) next() bool {
var log []byte
var err error
if log, err = sync.reader.Next(); log != nil {
payload := int64(len(log))
sync.replMetric.AddGet(1)
sync.replMetric.SetOplogMax(payload)
sync.replMetric.SetOplogAvg(payload)
sync.replMetric.ReplStatus.Clear(utils.FetchBad)
} else if err == sourceReader.CollectionCappedError {
LOG.Error("%s oplog collection capped error, users should fix it manually", sync)
utils.YieldInMs(DurationTime)
return false
} else if err != nil && err != sourceReader.TimeoutError {
LOG.Error("%s %s internal error: %v", sync, sync.reader.Name(), err)
// error is nil indicate that only timeout incur syncer.next()
// return false. so we regardless that
if sync.isCrashError(err.Error()) {
LOG.Crashf("%s I can't handle this error, please solve it manually!", sync)
}
sync.replMetric.ReplStatus.Update(utils.FetchBad)
utils.YieldInMs(DurationTime)
// alarm
}
// buffered oplog or trigger to flush. log is nil
// means that we need to flush buffer right now
// inject into persist handler
sync.persister.Inject(log)
return true
}
func (sync *OplogSyncer) checkShutdown() {
// single run, no need to adding lock or CAS
if (!utils.IncrSentinelOptions.Shutdown && utils.IncrSentinelOptions.ExitPoint <= 0) ||
sync.SyncGroup == nil || sync.shutdownWorking {
return
}
sync.shutdownWorking = true
nimo.GoRoutine(func() {
if utils.IncrSentinelOptions.Shutdown {
utils.IncrSentinelOptions.ExitPoint = utils.TimeStampToInt64(sync.LastFetchTs)
}
LOG.Info("%s check shutdown, set exit-point[%v]", sync, utils.IncrSentinelOptions.ExitPoint)
for range time.NewTicker(500 * time.Millisecond).C {
exitCount := 0
for _, syncer := range sync.SyncGroup {
if syncer.CanClose {
exitCount++
} else {
LOG.Info("%s syncer[%v] wait close, last fetch oplog timestamp[%v], exit-point[%v]",
sync, syncer.Replset, utils.ExtractMongoTimestamp(syncer.LastFetchTs),
utils.IncrSentinelOptions.ExitPoint)
}
}
if exitCount == len(sync.SyncGroup) {
break
}
}
LOG.Crashf("%s all syncer shutdown, try exit, don't be panic", sync)
})
}
func (sync *OplogSyncer) isCrashError(errMsg string) bool {
if conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodChangeStream &&
strings.Contains(errMsg, sourceReader.ErrInvalidStartPosition) {
return true
}
return false
}
func (sync *OplogSyncer) filterOplogGid(batchedOplog [][]*oplog.GenericOplog) error {
var err error
for _, batchGroup := range batchedOplog {
for _, log := range batchGroup {
if len(log.Parsed.Gid) > 0 {
log.Parsed.Gid = ""
log.Raw, err = bson.Marshal(&log.Parsed.ParsedLog)
if err != nil {
return fmt.Errorf("marshal gid filtered oplog[%v] failed: %v", log.Parsed, err)
}
}
}
}
return nil
}
func (sync *OplogSyncer) Handle(log *oplog.PartialLog) {
// 1. records audit log if need
sync.journal.WriteRecord(log)
}
func (sync *OplogSyncer) RestAPI() {
type Time struct {
TimestampUnix int64 `json:"unix"`
TimestampTime string `json:"time"`
}
type MongoTime struct {
Time
TimestampMongo string `json:"ts"`
}
type Info struct {
Who string `json:"who"`
Tag string `json:"tag"`
ReplicaSet string `json:"replset"`
Logs uint64 `json:"logs_get"`
LogsRepl uint64 `json:"logs_repl"`
LogsSuccess uint64 `json:"logs_success"`
Tps uint64 `json:"tps"`
Lsn *MongoTime `json:"lsn"`
LsnAck *MongoTime `json:"lsn_ack"`
LsnCkpt *MongoTime `json:"lsn_ckpt"`
Now *Time `json:"now"`
OplogAvg string `json:"log_size_avg"`
OplogMax string `json:"log_size_max"`
}
// total replication info
utils.IncrSyncHttpApi.RegisterAPI("/repl", nimo.HttpGet, func([]byte) interface{} {
return &Info{
Who: conf.Options.Id,
Tag: utils.BRANCH,
ReplicaSet: sync.Replset,
Logs: sync.replMetric.Get(),
LogsRepl: sync.replMetric.Apply(),
LogsSuccess: sync.replMetric.Success(),
Tps: sync.replMetric.Tps(),
Lsn: &MongoTime{
TimestampMongo: utils.Int64ToString(sync.replMetric.LSN),
Time: Time{
TimestampUnix: utils.ExtractMongoTimestamp(sync.replMetric.LSN),
TimestampTime: utils.TimestampToString(utils.ExtractMongoTimestamp(sync.replMetric.LSN)),
}},
LsnCkpt: &MongoTime{
TimestampMongo: utils.Int64ToString(sync.replMetric.LSNCheckpoint),
Time: Time{
TimestampUnix: utils.ExtractMongoTimestamp(sync.replMetric.LSNCheckpoint),
TimestampTime: utils.TimestampToString(utils.ExtractMongoTimestamp(sync.replMetric.LSNCheckpoint)),
}},
LsnAck: &MongoTime{
TimestampMongo: utils.Int64ToString(sync.replMetric.LSNAck),
Time: Time{
TimestampUnix: utils.ExtractMongoTimestamp(sync.replMetric.LSNAck),
TimestampTime: utils.TimestampToString(utils.ExtractMongoTimestamp(sync.replMetric.LSNAck)),
}},
Now: &Time{
TimestampUnix: time.Now().Unix(),
TimestampTime: utils.TimestampToString(time.Now().Unix()),
},
OplogAvg: utils.GetMetricWithSize(sync.replMetric.OplogAvgSize),
OplogMax: utils.GetMetricWithSize(sync.replMetric.OplogMaxSize),
}
})
// queue size info
type InnerQueue struct {
Id uint `json:"queue_id"`
PendingQueue uint64 `json:"pending_queue_used"`
LogsQueue uint64 `json:"logs_queue_used"`
}
type Queue struct {
SyncerId string `json:"syncer_replica_set_name"`
LogsQueuePerSize int `json:"logs_queue_size"`
PendingQueuePerSize int `json:"pending_queue_size"`
InnerQueue []InnerQueue `json:"syncer_inner_queue"`
PersisterBufferUsed int `json:"persister_buffer_used"`
}
utils.IncrSyncHttpApi.RegisterAPI("/queue", nimo.HttpGet, func([]byte) interface{} {
queue := make([]InnerQueue, calculatePendingQueueConcurrency())
for i := 0; i < len(queue); i++ {
queue[i] = InnerQueue{
Id: uint(i),
PendingQueue: uint64(len(sync.PendingQueue[i])),
LogsQueue: uint64(len(sync.logsQueue[i])),
}
}
return &Queue{
SyncerId: sync.Replset,
LogsQueuePerSize: cap(sync.logsQueue[0]),
PendingQueuePerSize: cap(sync.PendingQueue[0]),
InnerQueue: queue,
PersisterBufferUsed: len(sync.persister.Buffer),
}
})
}