in executor/db_writer_bulk.go [120:229]
func (bw *BulkWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
var models []mongo.WriteModel
for _, log := range oplogs {
var newObject interface{}
updateCmd := "update"
LOG.Debug("bulk_writer doUpdate: org_doc:%v", log.original.partialLog)
if oplog.FindFiledPrefix(log.original.partialLog.Object, "$") {
var oplogErr error
oplogVer, ok := oplog.GetKey(log.original.partialLog.Object, versionMark).(int32)
LOG.Debug("bulk_writer doUpdate: have $, org_object:%v "+
"object_ver:%v\n", log.original.partialLog.Object, oplogVer)
if ok && oplogVer == 2 {
if newObject, oplogErr = oplog.DiffUpdateOplogToNormal(log.original.partialLog.Object); oplogErr != nil {
LOG.Error("doUpdate run Faild err[%v] org_doc[%v]", oplogErr, log.original.partialLog)
return oplogErr
}
} else {
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
newObject = log.original.partialLog.Object
}
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
models = append(models, mongo.NewUpdateOneModel().
SetFilter(log.original.partialLog.DocumentKey).
SetUpdate(newObject).SetUpsert(true))
} else {
if upsert {
LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
}
model := mongo.NewUpdateOneModel().
SetFilter(log.original.partialLog.Query).
SetUpdate(newObject)
if upsert {
model.SetUpsert(true)
}
models = append(models, model)
}
} else {
newObject = log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
models = append(models, mongo.NewReplaceOneModel().
SetFilter(log.original.partialLog.DocumentKey).
SetReplacement(log.original.partialLog.Object).
SetUpsert(true))
} else {
model := mongo.NewReplaceOneModel().
SetFilter(log.original.partialLog.Query).
SetReplacement(log.original.partialLog.Object)
if upsert {
model.SetUpsert(true)
}
models = append(models, model)
}
updateCmd = "replace"
}
LOG.Debug("bulk_writer: %s %v aftermodify_doc:%v", updateCmd, newObject, log.original.partialLog)
}
LOG.Debug("bulk_writer: update models len %v", len(models))
res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(
context.Background(), models, nil)
if err != nil {
// parse error
index, errMsg, dup := utils.FindFirstErrorIndexAndMessageN(err)
var oplogRecord *OplogRecord
if index != -1 {
oplogRecord = oplogs[index]
}
LOG.Warn("detail error info with index[%v] msg[%v] dup[%v], isFullSyncStage[%v], oplog[%v] res[%v]",
index, errMsg, dup, parseLastTimestamp(oplogs) <= bw.fullFinishTs,
*oplogRecord.original.partialLog, res)
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(bw.conn, collection, oplogs)
// create single writer to write one by one
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
}
// error can be ignored
if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v]", err, "u",
parseLastTimestamp(oplogs) <= bw.fullFinishTs)
// re-run (index, len(oplogs) - 1]
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdate(database, collection, metadata, oplogs[index+1:], upsert)
}
if strings.Contains(err.Error(), shardKeyupdateErr) {
LOG.Error("multiUpdateShardKey err_string:%s, index:%d, redo update shardkey singly",
err.Error(), index)
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
}
LOG.Error("doUpdate run upsert/update[%v] failed[%v]", upsert, err)
return err
}
return nil
}