func()

in executor/db_writer_bulk.go [54:114]


func (bw *BulkWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
	var models []mongo.WriteModel

	for _, log := range oplogs {
		newObject := log.original.partialLog.Object
		if upsert && len(log.original.partialLog.DocumentKey) > 0 {

			models = append(models, mongo.NewUpdateOneModel().
				SetFilter(log.original.partialLog.DocumentKey).
				SetUpdate(bson.D{{"$set", newObject}}).SetUpsert(true))
		} else {
			if upsert {
				LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
			}
			// insert must have _id
			if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {

				model := mongo.NewUpdateOneModel().
					SetFilter(bson.D{{"_id", id}}).
					SetUpdate(bson.D{{"$set", newObject}})
				if upsert {
					model.SetUpsert(true)
				}
				models = append(models, model)
			} else {
				LOG.Warn("Insert on duplicated update _id look up failed. %v", log)
			}
		}

		LOG.Debug("bulk_writer: updateOnInsert %v", log.original.partialLog)
	}

	res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(nil, models, nil)

	if err != nil {
		// parse error
		index, errMsg, dup := utils.FindFirstErrorIndexAndMessageN(err)
		LOG.Error("detail error info with index[%v] msg[%v] dup[%v] res[%v]", index, errMsg, dup, res)

		if utils.DuplicateKey(err) {
			// create single writer to write one by one
			sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
			return sw.doUpdateOnInsert(database, collection, metadata, oplogs[index:], upsert)
		}

		// error can be ignored
		if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
			var oplogRecord *OplogRecord
			if index != -1 {
				oplogRecord = oplogs[index]
			}
			LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v], oplog[%v]",
				err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs, oplogRecord)
			return nil
		}

		LOG.Error("doUpdateOnInsert run upsert/update[%v] failed[%v]", upsert, err)
		return err
	}
	return nil
}