receiver/replayer.go (98 lines of code) (raw):

package replayer import ( utils "github.com/alibaba/MongoShake/v2/common" module "github.com/alibaba/MongoShake/v2/modules" "github.com/alibaba/MongoShake/v2/oplog" "github.com/alibaba/MongoShake/v2/tunnel" "go.mongodb.org/mongo-driver/bson" LOG "github.com/vinllen/log4go" ) const ( PendingQueueCapacity = 256 ) type ExampleReplayer struct { Retransmit bool // need re-transmit Ack int64 // ack number // current compressor construct by TMessage // Compress field specific compressor module.Compress // pending queue, use to pass message pendingQueue chan *MessageWithCallback id int // current replayer id } type MessageWithCallback struct { message *tunnel.TMessage completion func() } func NewExampleReplayer(id int) *ExampleReplayer { LOG.Info("ExampleReplayer start. pending queue capacity %d", PendingQueueCapacity) er := &ExampleReplayer{ pendingQueue: make(chan *MessageWithCallback, PendingQueueCapacity), id: id, } go er.handler() return er } /* * Receiver message and do the following steps: * 1. if we need re-transmit, this log will be discard * 2. validate the checksum * 3. decompress * 4. put message into channel * Generally speaking, do not modify this function. */ func (er *ExampleReplayer) Sync(message *tunnel.TMessage, completion func()) int64 { // tell collector we need re-trans all unacked oplogs first // this always happen on receiver restart ! if er.Retransmit { // reject normal oplogs request if message.Tag&tunnel.MsgRetransmission == 0 { return tunnel.ReplyRetransmission } er.Retransmit = false } // validate the checksum value if message.Checksum != 0 { recalculated := message.Crc32() if recalculated != message.Checksum { // we need the peer to retransmission the current message er.Retransmit = true LOG.Critical("Tunnel message checksum bad. recalculated is 0x%x. origin is 0x%x", recalculated, message.Checksum) return tunnel.ReplyChecksumInvalid } } // decompress if message.Compress != module.NoCompress { // reuse current compressor handle var err error if er.compressor, err = module.GetCompressorById(message.Compress); err != nil { er.Retransmit = true LOG.Critical("Tunnel message compressor not support. is %d", message.Compress) return tunnel.ReplyCompressorNotSupported } var decompress [][]byte for _, toDecompress := range message.RawLogs { bits, err := er.compressor.Decompress(toDecompress) if err == nil { decompress = append(decompress, bits) } } if len(decompress) != len(message.RawLogs) { er.Retransmit = true LOG.Critical("Decompress result isn't equivalent. len(decompress) %d, len(Logs) %d", len(decompress), len(message.RawLogs)) return tunnel.ReplyDecompressInvalid } message.RawLogs = decompress } er.pendingQueue <- &MessageWithCallback{message: message, completion: completion} return er.GetAcked() } func (er *ExampleReplayer) GetAcked() int64 { return er.Ack } /* * Users should modify this function according to different demands. */ func (er *ExampleReplayer) handler() { for msg := range er.pendingQueue { count := uint64(len(msg.message.RawLogs)) if count == 0 { // probe request continue } // parse batched message oplogs := make([]oplog.ParsedLog, len(msg.message.RawLogs)) for i, raw := range msg.message.RawLogs { oplogs[i] = oplog.ParsedLog{} if err := bson.Unmarshal(raw, &oplogs[i]); err != nil { // impossible switch, need panic and exit LOG.Crashf("unmarshal oplog[%v] failed[%v]", raw, err) return } LOG.Info(oplogs[i]) // just print for test, users can modify to fulfill different needs // fmt.Println(oplogs[i]) } if callback := msg.completion; callback != nil { callback() // exec callback } // get the newest timestamp n := len(oplogs) lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp) er.Ack = lastTs LOG.Debug("handle ack[%v]", er.Ack) // add logical code below } }