func()

in executor/db_writer_bulk.go [120:229]


func (bw *BulkWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {

	var models []mongo.WriteModel
	for _, log := range oplogs {
		var newObject interface{}

		updateCmd := "update"
		LOG.Debug("bulk_writer doUpdate: org_doc:%v", log.original.partialLog)
		if oplog.FindFiledPrefix(log.original.partialLog.Object, "$") {
			var oplogErr error

			oplogVer, ok := oplog.GetKey(log.original.partialLog.Object, versionMark).(int32)
			LOG.Debug("bulk_writer doUpdate: have $, org_object:%v "+
				"object_ver:%v\n", log.original.partialLog.Object, oplogVer)

			if ok && oplogVer == 2 {
				if newObject, oplogErr = oplog.DiffUpdateOplogToNormal(log.original.partialLog.Object); oplogErr != nil {
					LOG.Error("doUpdate run Faild err[%v] org_doc[%v]", oplogErr, log.original.partialLog)
					return oplogErr
				}
			} else {
				log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
				newObject = log.original.partialLog.Object
			}

			if upsert && len(log.original.partialLog.DocumentKey) > 0 {
				models = append(models, mongo.NewUpdateOneModel().
					SetFilter(log.original.partialLog.DocumentKey).
					SetUpdate(newObject).SetUpsert(true))
			} else {
				if upsert {
					LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
				}

				model := mongo.NewUpdateOneModel().
					SetFilter(log.original.partialLog.Query).
					SetUpdate(newObject)
				if upsert {
					model.SetUpsert(true)
				}
				models = append(models, model)
			}
		} else {
			newObject = log.original.partialLog.Object

			if upsert && len(log.original.partialLog.DocumentKey) > 0 {
				models = append(models, mongo.NewReplaceOneModel().
					SetFilter(log.original.partialLog.DocumentKey).
					SetReplacement(log.original.partialLog.Object).
					SetUpsert(true))
			} else {
				model := mongo.NewReplaceOneModel().
					SetFilter(log.original.partialLog.Query).
					SetReplacement(log.original.partialLog.Object)
				if upsert {
					model.SetUpsert(true)
				}
				models = append(models, model)
			}
			updateCmd = "replace"
		}

		LOG.Debug("bulk_writer: %s %v aftermodify_doc:%v", updateCmd, newObject, log.original.partialLog)
	}

	LOG.Debug("bulk_writer: update models len %v", len(models))

	res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(
		context.Background(), models, nil)

	if err != nil {
		// parse error
		index, errMsg, dup := utils.FindFirstErrorIndexAndMessageN(err)
		var oplogRecord *OplogRecord
		if index != -1 {
			oplogRecord = oplogs[index]
		}
		LOG.Warn("detail error info with index[%v] msg[%v] dup[%v], isFullSyncStage[%v], oplog[%v] res[%v]",
			index, errMsg, dup, parseLastTimestamp(oplogs) <= bw.fullFinishTs,
			*oplogRecord.original.partialLog, res)

		if utils.DuplicateKey(err) {
			RecordDuplicatedOplog(bw.conn, collection, oplogs)
			// create single writer to write one by one
			sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
			return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
		}

		// error can be ignored
		if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
			LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v]", err, "u",
				parseLastTimestamp(oplogs) <= bw.fullFinishTs)

			// re-run (index, len(oplogs) - 1]
			sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
			return sw.doUpdate(database, collection, metadata, oplogs[index+1:], upsert)
		}
		if strings.Contains(err.Error(), shardKeyupdateErr) {
			LOG.Error("multiUpdateShardKey err_string:%s, index:%d, redo update shardkey singly",
				err.Error(), index)

			sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
			return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
		}

		LOG.Error("doUpdate run upsert/update[%v] failed[%v]", upsert, err)
		return err
	}
	return nil
}