tunnel/mock_reader.go (96 lines of code) (raw):

package tunnel import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "math/rand" "github.com/alibaba/MongoShake/v2/oplog" "fmt" "time" LOG "github.com/vinllen/log4go" ) const ( BatchSize = 64 TableName = "mongoshake_mock.table" ) var opDict = []string{"i", "d", "u", "n"} type MockReader struct { generator []*FakeGenerator } type FakeGenerator struct { // not owned replayer []Replayer index uint32 } func (tunnel *MockReader) Link(replayer []Replayer) error { tunnel.generator = make([]*FakeGenerator, len(replayer)) for i := 0; i != len(replayer); i++ { LOG.Info("mock receiver generator-%d start", i) tunnel.generator[i] = &FakeGenerator{replayer: replayer} tunnel.generator[i].index = uint32(i) go tunnel.generator[i].start() } return nil } func (generator *FakeGenerator) start() { existIds := make(map[string]primitive.ObjectID, 10000000) for { var batch []*oplog.GenericOplog var partialLog *oplog.PartialLog for i := 0; i != BatchSize; i++ { partialLog = &oplog.PartialLog{ ParsedLog: oplog.ParsedLog{ Timestamp: primitive.Timestamp{T: uint32(time.Now().Unix()), I: 0}, Namespace: fmt.Sprintf("%s_%d", TableName, generator.index), }, } switch nr := rand.Uint32(); { case nr%1000 == 0: // noop 0.1% partialLog.Operation = "n" partialLog.Gid = "mock-noop" partialLog.Object = bson.D{bson.E{"mongoshake-mock", "ApsaraDB"}} case nr%100 == 0: // delete 1% for k, oid := range existIds { partialLog.Operation = "d" partialLog.Gid = "mock-delete" partialLog.Object = bson.D{bson.E{"_id", oid}} delete(existIds, k) break } case nr%3 == 0: // update 30% for _, oid := range existIds { partialLog.Operation = "u" partialLog.Gid = "mock-update" // partialLog.Object = bson.M{"$set": bson.M{"updates": nr}} partialLog.Object = bson.D{ bson.E{ Key: "$set", Value: bson.D{ bson.E{"updates", nr}, }, }, } partialLog.Query = bson.D{{"_id", oid}} break } default: // insert 70% oid := primitive.NewObjectID() partialLog.Operation = "i" partialLog.Gid = "mock-insert" partialLog.Object = bson.D{ bson.E{"_id", oid}, bson.E{"test", "1"}, bson.E{"abc", nr}, } existIds[oid.Hex()] = oid } bytes, _ := bson.Marshal(partialLog) batch = append(batch, &oplog.GenericOplog{Raw: bytes}) } generator.replayer[generator.index].Sync(&TMessage{ Checksum: 0, Tag: MsgRetransmission, Shard: generator.index, Compress: 0, RawLogs: oplog.LogEntryEncode(batch), }, nil) LOG.Info("mock generator-index-%d generate and apply logs %d", generator.index, len(batch)) } }