func()

in nimo-shake/writer/mongodb_mgo_driver.go [208:243]


func (mw *MongoWriter) Update(input []interface{}, index []interface{}) error {
	updates := make([]interface{}, 0, len(input)*2)
	for i := range input {
		updates = append(updates, index[i])
		updates = append(updates, input[i])
	}

	bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk()
	if conf.Options.IncreaseExecutorUpsert {
		bulk.Upsert(updates...)
	} else {
		bulk.Update(updates...)
	}

	if _, err := bulk.Run(); err != nil {
		LOG.Warn(err)
		// parse error
		idx, _, _ := utils.FindFirstErrorIndexAndMessage(err.Error())
		if idx == -1 {
			return err
		}

		// always upsert data
		if utils.MongodbIgnoreError(err, "u", true) {
			return mw.updateOnInsert(input[idx:], index[idx:])
		}

		if mgo.IsDup(err) {
			LOG.Info("error[%v] is dup, ignore", err)
			return mw.updateOnInsert(input[idx+1:], index[idx+1:])
		}
		return err
	}

	return nil
}