func()

in executor/db_writer_single.go [60:141]


func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
	type pair struct {
		id    interface{}
		data  bson.D
		index int
	}
	var updates []*pair
	for i, log := range oplogs {
		newObject := log.original.partialLog.Object
		if upsert && len(log.original.partialLog.DocumentKey) > 0 {
			updates = append(updates, &pair{id: log.original.partialLog.DocumentKey, data: newObject, index: i})
		} else {
			if upsert {
				LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
			}
			// insert must have _id
			if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
				updates = append(updates, &pair{id: bson.D{{"_id", id}}, data: newObject, index: i})
			} else {
				return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog)
			}
		}

		LOG.Debug("single_writer: updateOnInsert %v", log.original.partialLog)
	}

	collectionHandle := sw.conn.Client.Database(database).Collection(collection)
	if upsert {
		for _, update := range updates {

			opts := options.Update().SetUpsert(true)
			res, err := collectionHandle.UpdateOne(context.Background(), update.id,
				bson.D{{"$set", update.data}}, opts)
			if err != nil {
				LOG.Warn("upsert _id[%v] with data[%v] meets err[%v] res[%v], try to solve",
					update.id, update.data, err, res)

				// error can be ignored(insert fail & oplog is before full end)
				if utils.DuplicateKey(err) &&
					utils.TimeStampToInt64(oplogs[update.index].original.partialLog.Timestamp) <= sw.fullFinishTs {
					continue
				}

				LOG.Error("upsert _id[%v] with data[%v] failed[%v]", update.id, update.data, err)
				return err
			}
			if res != nil {
				if res.MatchedCount != 1 && res.UpsertedCount != 1 {
					return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d UpsertedCount:%d) upsert _id[%v] with data[%v]",
						res.MatchedCount, res.ModifiedCount, res.UpsertedCount, update.id, update.data)
				}
			}
		}
	} else {
		for i, update := range updates {

			res, err := collectionHandle.UpdateOne(context.Background(), update.id,
				bson.D{{"$set", update.data}}, nil)
			if err != nil && utils.DuplicateKey(err) == false {
				LOG.Warn("update _id[%v] with data[%v] meets err[%v] res[%v], try to solve",
					update.id, update.data, err, res)

				// error can be ignored
				if IgnoreError(err, "u",
					utils.TimeStampToInt64(oplogs[i].original.partialLog.Timestamp) <= sw.fullFinishTs) {
					continue
				}

				LOG.Error("update _id[%v] with data[%v] failed[%v]", update.id, update.data, err.Error())
				return err
			}
			if res != nil {
				if res.MatchedCount != 1 {
					return fmt.Errorf("Update fail(MatchedCount:%d, ModifiedCount:%d) old-data[%v] with new-data[%v]",
						res.MatchedCount, res.ModifiedCount, update.id, update.data)
				}
			}
		}
	}

	return nil
}