executor/operation.go (205 lines of code) (raw):

package executor import ( "go.mongodb.org/mongo-driver/mongo" "reflect" "strings" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" "github.com/alibaba/MongoShake/v2/oplog" "fmt" "sync/atomic" LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" ) var ErrorsShouldSkip = map[int]string{ 61: "ShardKeyNotFound", } func (exec *Executor) ensureConnection() bool { // reconnect if necessary if exec.conn == nil || !exec.conn.IsGood() { writeContern := utils.ReadWriteConcernDefault if conf.Options.FullSyncExecutorMajorityEnable { writeContern = utils.ReadWriteConcernMajority } if conn, err := utils.NewMongoCommunityConn(exec.MongoUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernDefault, writeContern, conf.Options.TunnelMongoSslRootCaFile); err != nil { LOG.Critical("Connect to mongo cluster failed. %v", err) return false } else { exec.conn = conn if exec.bulkInsert, err = utils.GetAndCompareVersion(exec.conn, ThresholdVersion, conf.Options.TargetDBVersion); err != nil { LOG.Info("compare version with return[%v], bulkInsert disable", err) } } } return true } func (exec *Executor) dropConnection() { exec.conn.Close() exec.conn = nil } func (exec *Executor) execute(group *OplogsGroup) error { count := uint64(len(group.oplogRecords)) if count == 0 { // probe return nil } lastOne := group.oplogRecords[count-1] if !conf.Options.IncrSyncExecutorDebug { if !exec.ensureConnection() { return fmt.Errorf("Replay-%d network connection lost . we would retry for next connecting", exec.batchExecutor.ReplayerId) } // just use the first log. they have the same metadata // zhongli: ??? metadata := buildMetadata(group.oplogRecords[0].original.partialLog) hasIndex := strings.Contains(group.ns, "system.indexes") // LOG.Debug("fullFinishTs: %v", utils.ExtractTimestampForLog(exec.batchExecutor.FullFinishTs)) dbWriter := NewDbWriter(exec.conn, metadata, exec.bulkInsert && !hasIndex, exec.batchExecutor.FullFinishTs) var err error LOG.Debug("Replay-%d oplog collection ns [%s] with command [%s] batch count %d, metadata %v", exec.batchExecutor.ReplayerId, group.ns, strings.ToUpper(lookupOpName(group.op)), count, metadata) /* * in the former version, we filter DDL here. But in current version, all messages that need filter * have already removed in the collector(syncer). So here, we only need to write all oplogs. */ // for indexes // "0" -> database, "1" -> collection dc := strings.SplitN(group.ns, ".", 2) switch group.op { case "i": err = dbWriter.doInsert(dc[0], dc[1], metadata, group.oplogRecords, conf.Options.IncrSyncExecutorInsertOnDupUpdate) atomic.AddUint64(&exec.metricInsert, uint64(len(group.oplogRecords))) exec.addNsMapMetric(group.ns, "i", len(group.oplogRecords)) case "u": err = dbWriter.doUpdate(dc[0], dc[1], metadata, group.oplogRecords, conf.Options.IncrSyncExecutorUpsert) atomic.AddUint64(&exec.metricUpdate, uint64(len(group.oplogRecords))) exec.addNsMapMetric(group.ns, "u", len(group.oplogRecords)) case "d": err = dbWriter.doDelete(dc[0], dc[1], metadata, group.oplogRecords) atomic.AddUint64(&exec.metricDelete, uint64(len(group.oplogRecords))) exec.addNsMapMetric(group.ns, "d", len(group.oplogRecords)) case "c": LOG.Info("Replay-%d run DDL with metadata[%v] in db[%v], firstLog: %v", exec.batchExecutor.ReplayerId, dc[0], metadata, group.oplogRecords[0].original.partialLog) err = dbWriter.doCommand(dc[0], metadata, group.oplogRecords) atomic.AddUint64(&exec.metricDDL, uint64(len(group.oplogRecords))) exec.addNsMapMetric(group.ns, "c", len(group.oplogRecords)) case "n": // exec.batchExecutor.ReplMetric.AddFilter(count) atomic.AddUint64(&exec.metricNoop, uint64(len(group.oplogRecords))) exec.addNsMapMetric(group.ns, "n", len(group.oplogRecords)) default: atomic.AddUint64(&exec.metricUnknown, uint64(len(group.oplogRecords))) LOG.Warn("Replay-%d meets unknown type oplogs found. op '%s'", exec.batchExecutor.ReplayerId, group.op) exec.addNsMapMetric(group.ns, "x", len(group.oplogRecords)) } // a few known error we can skip !! such as "ShardKeyNotFound" returned // if mongoshake connected to MongoS if exec.errorIgnore(err) { LOG.Info("Replay-%d Discard known error[%v], It's acceptable", exec.batchExecutor.ReplayerId, err) err = nil } if err != nil { LOG.Critical("Replayer-%d, executor-%d, oplog for namespace[%s] op[%s] failed. error type[%v]"+ " error[%v], logs number[%d], firstLog: %s", exec.batchExecutor.ReplayerId, exec.id, group.ns, group.op, reflect.TypeOf(err), err.Error(), count, group.oplogRecords[0].original.partialLog) exec.dropConnection() atomic.AddUint64(&exec.metricError, uint64(len(group.oplogRecords))) exec.addNsMapMetric(group.ns, "e", len(group.oplogRecords)) return err } } // exec.batchExecutor.ReplMetric.ReplStatus.Clear(utils.ReplicaExecBad) // wait for conflict break point if need if lastOne.wait != nil { lastOne.wait() } // group logs have the equivalent namespace //ns := group.logs[0].original.partialLog.Namespace //exec.replayer.ReplMetric.AddTableOps(ns, count) return nil } func (exec *Executor) errorIgnore(err error) bool { if err == nil { return false } er, ok := err.(mongo.ServerError) if !ok { return false } for k, _ := range ErrorsShouldSkip { if er.HasErrorCode(k) { return true } } return false } func (exec *Executor) addNsMapMetric(ns, op string, count int) { var metricSum *uint64 switch op { case "i": val, ok := exec.metricInsertMap.Load(ns) if !ok { storeVal := uint64(0) exec.metricInsertMap.Store(ns, &storeVal) metricSum = &storeVal } else { metricSum = val.(*uint64) } case "u": val, ok := exec.metricUpdateMap.Load(ns) if !ok { storeVal := uint64(0) exec.metricUpdateMap.Store(ns, &storeVal) metricSum = &storeVal } else { metricSum = val.(*uint64) } case "d": val, ok := exec.metricDeleteMap.Load(ns) if !ok { storeVal := uint64(0) exec.metricDeleteMap.Store(ns, &storeVal) metricSum = &storeVal } else { metricSum = val.(*uint64) } case "c": val, ok := exec.metricDDLMap.Load(ns) if !ok { storeVal := uint64(0) exec.metricDDLMap.Store(ns, &storeVal) metricSum = &storeVal } else { metricSum = val.(*uint64) } case "x": val, ok := exec.metricUnknownMap.Load(ns) if !ok { storeVal := uint64(0) exec.metricUnknownMap.Store(ns, &storeVal) metricSum = &storeVal } else { metricSum = val.(*uint64) } case "e": val, ok := exec.metricErrorMap.Load(ns) if !ok { storeVal := uint64(0) exec.metricErrorMap.Store(ns, &storeVal) metricSum = &storeVal } else { metricSum = val.(*uint64) } } atomic.AddUint64(metricSum, uint64(count)) } func buildMetadata(oplog *oplog.PartialLog) bson.E { // with gid carried if len(oplog.Gid) != 0 { return bson.E{Key: "g", Value: oplog.Gid} } return bson.E{} } func lookupOpName(op string) string { switch op { case "i": return "insert" case "u": return "update" case "d": return "delete" case "c": return "create" case "n": return "noop" default: return "unknown" } }