in executor/db_writer_single.go [154:252]
func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
collectionHandle := sw.conn.Client.Database(database).Collection(collection)
for _, log := range oplogs {
var update interface{}
var err error
var res *mongo.UpdateResult
updateCmd := "update"
LOG.Debug("single_writer: org_doc %v", log.original.partialLog)
if oplog.FindFiledPrefix(log.original.partialLog.Object, "$") {
var oplogErr error
oplogVer, ok := oplog.GetKey(log.original.partialLog.Object, versionMark).(int32)
LOG.Debug("single_writer doUpdate: have $, org_object:%v "+
"object_ver:%v\n", log.original.partialLog.Object, oplogVer)
if ok && oplogVer == 2 {
if update, oplogErr = oplog.DiffUpdateOplogToNormal(log.original.partialLog.Object); oplogErr != nil {
LOG.Error("doUpdate run Faild err[%v] org_doc[%v]", oplogErr, log.original.partialLog)
return oplogErr
}
} else {
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
update = log.original.partialLog.Object
}
opts := options.Update()
if upsert {
opts.SetUpsert(true)
}
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
res, err = collectionHandle.UpdateOne(context.Background(), log.original.partialLog.DocumentKey,
update, opts)
} else {
if upsert {
LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
}
res, err = collectionHandle.UpdateOne(context.Background(), log.original.partialLog.Query,
update, opts)
}
} else {
update = log.original.partialLog.Object
opts := options.Replace()
if upsert {
opts.SetUpsert(true)
}
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
res, err = collectionHandle.ReplaceOne(context.Background(), log.original.partialLog.DocumentKey,
update, opts)
} else {
res, err = collectionHandle.ReplaceOne(context.Background(), log.original.partialLog.Query,
update, opts)
}
updateCmd = "replace"
}
LOG.Debug("single_writer: %s %v aftermodify_doc:%v", updateCmd, update, log.original.partialLog)
if err != nil {
// error can be ignored
if IgnoreError(err, "u",
utils.TimeStampToInt64(log.original.partialLog.Timestamp) <= sw.fullFinishTs) {
continue
}
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(sw.conn, collection, oplogs)
continue
}
LOG.Error("doUpdate[upsert] old-data[%v] with new-data[%v] failed[%v]",
log.original.partialLog.Query, log.original.partialLog.Object, err)
return err
}
if res != nil {
if upsert {
if res.MatchedCount != 1 && res.UpsertedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d UpsertedCount:%d) old-data[%v] with new-data[%v]",
res.MatchedCount, res.ModifiedCount, res.UpsertedCount,
log.original.partialLog.Query, log.original.partialLog.Object)
}
} else {
if res.MatchedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d MatchedCount:%d) old-data[%v] with new-data[%v]",
res.MatchedCount, res.ModifiedCount, res.MatchedCount,
log.original.partialLog.Query, log.original.partialLog.Object)
}
}
}
LOG.Debug("single_writer: aftermodify_doc %v %s[%v]", log.original.partialLog, updateCmd, update)
}
return nil
}