collector/write_controller.go (80 lines of code) (raw):
package collector
import (
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
module "github.com/alibaba/MongoShake/v2/modules"
"github.com/alibaba/MongoShake/v2/oplog"
"github.com/alibaba/MongoShake/v2/tunnel"
)
type WriteController struct {
// worker (not owned)
worker *Worker
// modules
moduleList []Module
// backend tunnel
tunnel tunnel.Writer
// current max lsn_ack value
LatestLsnAck int64
}
type Module interface {
IsRegistered() bool
/**
* Module install and initialize. return false on failed
* and only invocation on WriteController is preparing
*/
Install() bool
/**
* Handle outstanding request message. and messages
* are passed one by one. Any changes of message in
* Handle() will be preserved and delivery to next
*
* @return tunnel's error code (<0) or ack value
*
*/
Handle(message *tunnel.WMessage) int64
}
// the order of controller modules declared strictly
// doesn't change the order
var orderedModuleList = []Module{
&module.Compressor{},
&module.ChecksumCalculator{},
}
func NewWriteController(worker *Worker) *WriteController {
writeController := &WriteController{worker: worker}
if !writeController.installModules() {
return nil
}
// create t by options
factory := tunnel.WriterFactory{Name: conf.Options.Tunnel}
if writeController.tunnel = factory.Create(conf.Options.TunnelAddress, worker.id); writeController.tunnel != nil {
if writeController.tunnel.Prepare() {
return writeController
}
}
return nil
}
// set init sync finish timestamp if tunnel is direct
func (controller *WriteController) SetInitSyncFinishTs(fullSyncFinishPosition int64) {
if controller.tunnel.Name() == "direct" {
dw := controller.tunnel.(*tunnel.DirectWriter)
dw.BatchExecutor.FullFinishTs = fullSyncFinishPosition
}
}
func (controller *WriteController) installModules() bool {
for _, m := range orderedModuleList {
if m.IsRegistered() {
if !m.Install() {
return false
}
controller.moduleList = append(controller.moduleList, m)
}
}
return true
}
func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64 {
// all tunnel message which contain empty logs will be considered as
// probe message. Include real probe to get ack from remote server
// or a normal message doesn't have logs (which submit by retransmission)
if !controller.tunnel.AckRequired() && tag&tunnel.MsgProbe != 0 {
// probe message is not useful while tunnel AckRequired() is false
// ignore these messages. Tag on these message contain
// MsgRetransmission flag is impossible.
return controller.LatestLsnAck
}
message := &tunnel.WMessage{
TMessage: &tunnel.TMessage{
Tag: tag,
Shard: controller.worker.id,
RawLogs: oplog.LogEntryEncode(logs),
},
ParsedLogs: oplog.LogParsed(logs),
}
for _, m := range controller.moduleList {
if internalCode := m.Handle(message); internalCode < 0 {
return internalCode
}
}
// we return the error directly if send() failed(feedback must less than zero). feedback bigger
// or equal zero means has sent successfully. And if tunnel is AckRequired() we set the LatestLsnAck
// in order to notify the upper layer ACK value. if not, we only drop the ACK and return the
// "last message" timestamp. that indicates nothing should be ACKed
if feedback := controller.tunnel.Send(message); feedback < 0 {
// failed
return feedback
} else if controller.tunnel.AckRequired() {
// ok, need ack value
controller.LatestLsnAck = feedback
} else if message.Tag&tunnel.MsgProbe == 0 && len(message.RawLogs) != 0 {
// direct tunnel way will also come into this branch
controller.LatestLsnAck = utils.TimeStampToInt64(logs[len(logs)-1].Parsed.Timestamp)
}
// accumulated overall logs size
controller.worker.syncer.replMetric.AddTunnelTraffic(message.ApproximateSize())
return controller.LatestLsnAck
}