collector/worker.go (190 lines of code) (raw):

package collector import ( "fmt" "sort" "sync/atomic" "time" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/oplog" "github.com/alibaba/MongoShake/v2/tunnel" nimo "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" ) const MaxUnAckListLength = 128 * 256 type Worker struct { // parent syncer syncer *OplogSyncer // worker sequence id id uint32 // job queue from OplogTailer queue chan []*oplog.GenericOplog // worker tunnel controller (include tunnel and modules) writeController *WriteController // buffered oplogs . include unack buffer // and send buffer listUnACK []*oplog.GenericOplog // ack offset (used for checkpoint) ack, unack int64 count uint64 // if all oplogs are acked for righ now allAcked bool // retransmit on tunnel controller tunnel required retransmit bool // event listener eventListener *TransferEventListener } type TransferEventListener struct { whenTransferBatchSuccess func(worker *Worker, buffer []*oplog.GenericOplog) whenTransferRetry func(worker *Worker, buffer []*oplog.GenericOplog) } func NewWorker(syncer *OplogSyncer, id uint32) *Worker { return &Worker{ syncer: syncer, id: id, queue: make(chan []*oplog.GenericOplog, conf.Options.IncrSyncWorkerBatchQueueSize), } } func (worker *Worker) String() string { return fmt.Sprintf("Collector-worker-%d", worker.id) } func (worker *Worker) Init() bool { worker.RestAPI() worker.writeController = NewWriteController(worker) if worker.writeController == nil { return false } return true } func (worker *Worker) SetInitSyncFinishTs(fullSyncFinishPosition int64) { worker.writeController.SetInitSyncFinishTs(fullSyncFinishPosition) } func (worker *Worker) IsAllAcked() bool { return worker.allAcked } func (worker *Worker) AllAcked(allAcked bool) { worker.allAcked = allAcked } func (worker *Worker) Offer(batch []*oplog.GenericOplog) { if batch != nil { atomic.StoreInt64(&worker.unack, utils.TimeStampToInt64(batch[len(batch)-1].Parsed.Timestamp)) } worker.queue <- batch } func (worker *Worker) shouldDelay() bool { // unack buffer is too big. There should be a mass of accumulated oplogs // have already sent but not be ack yet. No more oplogs pushed ! if len(worker.listUnACK) > MaxUnAckListLength { // try to transfer remained oplogs to free some space return true } return false } func (worker *Worker) shouldStall() bool { // suspend while system pause is set. This is always operated by outside // manual system. It needs stopping here util turned off return utils.IncrSentinelOptions.Pause } func (worker *Worker) findFirstAvailableBatch() []*oplog.GenericOplog { var batch []*oplog.GenericOplog for { select { case batch = <-worker.queue: case <-time.After(DDLCheckpointInterval * time.Millisecond): // timeout, add probe message here return nil } // if we got empty (nil) batch. check the queue still has more // oplogs. That will merge continuous nil batch to single one if batch != nil || len(worker.queue) == 0 { return batch } } } func (worker *Worker) StartWorker() { LOG.Info("%s start working with jobs batch queue. buffer capacity %d", worker, cap(worker.queue)) var batch []*oplog.GenericOplog for { switch { case worker.shouldDelay(): // we guess there were lots of oplogs have been pended in jobs queue. // we need wait for a few moment worker.probe() worker.purgeACK() fallthrough case worker.shouldStall(): utils.DelayFor(10) default: if batch = worker.findFirstAvailableBatch(); batch == nil { worker.probe() } else { worker.transfer(batch) worker.syncer.replMetric.AddConsume(uint64(len(batch))) } utils.DEBUG_LOG("%s poll queued batch oplogs. total[%d]", worker, len(batch)) } } } /* * * [ Before transfer ] * * batch |9,10,11| * listSent |1,2,3,4,5,6,7,8| * * [ After transfer ] * * batch | (empty) | * listSent |1,2,3,4,5,6,7,8,9,10,11| * * [ Purge listWaitACK (ack == 7) ] * * listUnACK |8,9,10,11| * */ func (worker *Worker) transfer(batch []*oplog.GenericOplog) { nimo.AssertTrue(batch != nil, "batch oplogs should not empty") var logs []*oplog.GenericOplog var tag uint32 done := false // transfer util current batch is sent(done == true) for !done { if worker.retransmit { // TODO: send all unack logs ? it's possible very big logs = worker.listUnACK tag = tunnel.MsgRetransmission worker.syncer.replMetric.AddRetransmission(1) } else { logs = batch tag = tunnel.MsgNormal } replyAndAcked := worker.writeController.Send(logs, tag) LOG.Info("%s transfer retransmit:%t send [%d] logs. reply_acked [%v], list_unack [%d] ", worker, worker.retransmit, len(logs), utils.ExtractTimestampForLog(replyAndAcked), len(worker.listUnACK)) switch { case replyAndAcked >= 0: if !worker.retransmit { worker.count += uint64(len(logs)) worker.syncer.replMetric.SetLSNACK(replyAndAcked) worker.syncer.replMetric.AddApply(uint64(len(logs))) worker.retain(batch) // update ack atomic.StoreInt64(&worker.ack, replyAndAcked) done = true } // remove all unack values which are smaller than worker.ack worker.purgeACK() // reset worker.retransmit = false // notify success listener worker.syncer.replMetric.ReplStatus.Clear(utils.TunnelSendBad) case replyAndAcked == tunnel.ReplyRetransmission: LOG.Info("%s received ReplyRetransmission reply, now status %t", worker, worker.retransmit) // next step. keep trying with retransmission util we received // a non-retransmission message worker.retransmit = true default: LOG.Warn("%s transfer oplogs failed with reply value %d", worker, replyAndAcked) // we treat batched logs fail as just one time failed. and // notify failed retry listener worker.syncer.replMetric.AddFailed(1) //worker.retransmit = true worker.syncer.replMetric.ReplStatus.Update(utils.TunnelSendBad) } } } func (worker *Worker) probe() { if replyAcked := worker.writeController.Send([]*oplog.GenericOplog{}, tunnel.MsgProbe); replyAcked > 0 { // only change ack offset on reply is OK worker.syncer.replMetric.SetLSNACK(replyAcked) atomic.StoreInt64(&worker.ack, replyAcked) } } func (worker *Worker) retain(batch []*oplog.GenericOplog) { worker.listUnACK = append(worker.listUnACK, batch...) LOG.Debug("%s copy batch oplogs [%d] to listUnACK count. UnACK remained [%d]", worker, len(batch), len(worker.listUnACK)) } func (worker *Worker) purgeACK() { bigger := sort.Search(len(worker.listUnACK), func(i int) bool { return utils.TimeStampToInt64(worker.listUnACK[i].Parsed.Timestamp) > worker.ack }) if bigger != 0 { LOG.Debug("%s purge unacked [lsn_ack:%d]. keep slice position from %d util %d", worker, worker.ack, bigger, len(worker.listUnACK)) worker.listUnACK = worker.listUnACK[bigger:] worker.syncer.replMetric.AddSuccess(uint64(bigger)) } } func (worker *Worker) RestAPI() { type WorkerInfo struct { Id uint32 `json:"worker_id"` JobsQueued int `json:"jobs_in_queue"` JobsUnACKBuffer int `json:"jobs_unack_buffer"` LastUnACK string `json:"last_unack"` LastACK string `json:"last_ack"` COUNT uint64 `json:"count"` } utils.IncrSyncHttpApi.RegisterAPI("/worker", nimo.HttpGet, func([]byte) interface{} { return &WorkerInfo{ Id: worker.id, JobsQueued: len(worker.queue), JobsUnACKBuffer: len(worker.listUnACK), LastUnACK: utils.Int64ToString(worker.unack), LastACK: utils.Int64ToString(worker.ack), COUNT: worker.count, } }) }