in executor/db_writer_bulk.go [54:114]
func (bw *BulkWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
var models []mongo.WriteModel
for _, log := range oplogs {
newObject := log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
models = append(models, mongo.NewUpdateOneModel().
SetFilter(log.original.partialLog.DocumentKey).
SetUpdate(bson.D{{"$set", newObject}}).SetUpsert(true))
} else {
if upsert {
LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
}
// insert must have _id
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
model := mongo.NewUpdateOneModel().
SetFilter(bson.D{{"_id", id}}).
SetUpdate(bson.D{{"$set", newObject}})
if upsert {
model.SetUpsert(true)
}
models = append(models, model)
} else {
LOG.Warn("Insert on duplicated update _id look up failed. %v", log)
}
}
LOG.Debug("bulk_writer: updateOnInsert %v", log.original.partialLog)
}
res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(nil, models, nil)
if err != nil {
// parse error
index, errMsg, dup := utils.FindFirstErrorIndexAndMessageN(err)
LOG.Error("detail error info with index[%v] msg[%v] dup[%v] res[%v]", index, errMsg, dup, res)
if utils.DuplicateKey(err) {
// create single writer to write one by one
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdateOnInsert(database, collection, metadata, oplogs[index:], upsert)
}
// error can be ignored
if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
var oplogRecord *OplogRecord
if index != -1 {
oplogRecord = oplogs[index]
}
LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v], oplog[%v]",
err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs, oplogRecord)
return nil
}
LOG.Error("doUpdateOnInsert run upsert/update[%v] failed[%v]", upsert, err)
return err
}
return nil
}