in executor/db_writer_single.go [60:141]
func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
type pair struct {
id interface{}
data bson.D
index int
}
var updates []*pair
for i, log := range oplogs {
newObject := log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
updates = append(updates, &pair{id: log.original.partialLog.DocumentKey, data: newObject, index: i})
} else {
if upsert {
LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
}
// insert must have _id
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
updates = append(updates, &pair{id: bson.D{{"_id", id}}, data: newObject, index: i})
} else {
return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog)
}
}
LOG.Debug("single_writer: updateOnInsert %v", log.original.partialLog)
}
collectionHandle := sw.conn.Client.Database(database).Collection(collection)
if upsert {
for _, update := range updates {
opts := options.Update().SetUpsert(true)
res, err := collectionHandle.UpdateOne(context.Background(), update.id,
bson.D{{"$set", update.data}}, opts)
if err != nil {
LOG.Warn("upsert _id[%v] with data[%v] meets err[%v] res[%v], try to solve",
update.id, update.data, err, res)
// error can be ignored(insert fail & oplog is before full end)
if utils.DuplicateKey(err) &&
utils.TimeStampToInt64(oplogs[update.index].original.partialLog.Timestamp) <= sw.fullFinishTs {
continue
}
LOG.Error("upsert _id[%v] with data[%v] failed[%v]", update.id, update.data, err)
return err
}
if res != nil {
if res.MatchedCount != 1 && res.UpsertedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d UpsertedCount:%d) upsert _id[%v] with data[%v]",
res.MatchedCount, res.ModifiedCount, res.UpsertedCount, update.id, update.data)
}
}
}
} else {
for i, update := range updates {
res, err := collectionHandle.UpdateOne(context.Background(), update.id,
bson.D{{"$set", update.data}}, nil)
if err != nil && utils.DuplicateKey(err) == false {
LOG.Warn("update _id[%v] with data[%v] meets err[%v] res[%v], try to solve",
update.id, update.data, err, res)
// error can be ignored
if IgnoreError(err, "u",
utils.TimeStampToInt64(oplogs[i].original.partialLog.Timestamp) <= sw.fullFinishTs) {
continue
}
LOG.Error("update _id[%v] with data[%v] failed[%v]", update.id, update.data, err.Error())
return err
}
if res != nil {
if res.MatchedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d, ModifiedCount:%d) old-data[%v] with new-data[%v]",
res.MatchedCount, res.ModifiedCount, update.id, update.data)
}
}
}
}
return nil
}