func()

in executor/db_writer_single.go [154:252]


func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
	collectionHandle := sw.conn.Client.Database(database).Collection(collection)

	for _, log := range oplogs {
		var update interface{}
		var err error
		var res *mongo.UpdateResult

		updateCmd := "update"
		LOG.Debug("single_writer: org_doc %v", log.original.partialLog)
		if oplog.FindFiledPrefix(log.original.partialLog.Object, "$") {
			var oplogErr error

			oplogVer, ok := oplog.GetKey(log.original.partialLog.Object, versionMark).(int32)
			LOG.Debug("single_writer doUpdate: have $, org_object:%v "+
				"object_ver:%v\n", log.original.partialLog.Object, oplogVer)

			if ok && oplogVer == 2 {
				if update, oplogErr = oplog.DiffUpdateOplogToNormal(log.original.partialLog.Object); oplogErr != nil {
					LOG.Error("doUpdate run Faild err[%v] org_doc[%v]", oplogErr, log.original.partialLog)
					return oplogErr
				}
			} else {
				log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
				update = log.original.partialLog.Object
			}

			opts := options.Update()
			if upsert {
				opts.SetUpsert(true)
			}
			if upsert && len(log.original.partialLog.DocumentKey) > 0 {
				res, err = collectionHandle.UpdateOne(context.Background(), log.original.partialLog.DocumentKey,
					update, opts)
			} else {
				if upsert {
					LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
				}

				res, err = collectionHandle.UpdateOne(context.Background(), log.original.partialLog.Query,
					update, opts)
			}
		} else {
			update = log.original.partialLog.Object

			opts := options.Replace()
			if upsert {
				opts.SetUpsert(true)
			}
			if upsert && len(log.original.partialLog.DocumentKey) > 0 {
				res, err = collectionHandle.ReplaceOne(context.Background(), log.original.partialLog.DocumentKey,
					update, opts)
			} else {
				res, err = collectionHandle.ReplaceOne(context.Background(), log.original.partialLog.Query,
					update, opts)
			}

			updateCmd = "replace"
		}
		LOG.Debug("single_writer: %s %v aftermodify_doc:%v", updateCmd, update, log.original.partialLog)

		if err != nil {
			// error can be ignored
			if IgnoreError(err, "u",
				utils.TimeStampToInt64(log.original.partialLog.Timestamp) <= sw.fullFinishTs) {
				continue
			}

			if utils.DuplicateKey(err) {
				RecordDuplicatedOplog(sw.conn, collection, oplogs)
				continue
			}

			LOG.Error("doUpdate[upsert] old-data[%v] with new-data[%v] failed[%v]",
				log.original.partialLog.Query, log.original.partialLog.Object, err)
			return err
		}
		if res != nil {
			if upsert {
				if res.MatchedCount != 1 && res.UpsertedCount != 1 {
					return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d UpsertedCount:%d) old-data[%v] with new-data[%v]",
						res.MatchedCount, res.ModifiedCount, res.UpsertedCount,
						log.original.partialLog.Query, log.original.partialLog.Object)
				}
			} else {
				if res.MatchedCount != 1 {
					return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d MatchedCount:%d) old-data[%v] with new-data[%v]",
						res.MatchedCount, res.ModifiedCount, res.MatchedCount,
						log.original.partialLog.Query, log.original.partialLog.Object)
				}
			}
		}

		LOG.Debug("single_writer: aftermodify_doc %v %s[%v]", log.original.partialLog, updateCmd, update)
	}

	return nil

}