in nimo-shake/writer/mongodb_mgo_driver.go [208:243]
func (mw *MongoWriter) Update(input []interface{}, index []interface{}) error {
updates := make([]interface{}, 0, len(input)*2)
for i := range input {
updates = append(updates, index[i])
updates = append(updates, input[i])
}
bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk()
if conf.Options.IncreaseExecutorUpsert {
bulk.Upsert(updates...)
} else {
bulk.Update(updates...)
}
if _, err := bulk.Run(); err != nil {
LOG.Warn(err)
// parse error
idx, _, _ := utils.FindFirstErrorIndexAndMessage(err.Error())
if idx == -1 {
return err
}
// always upsert data
if utils.MongodbIgnoreError(err, "u", true) {
return mw.updateOnInsert(input[idx:], index[idx:])
}
if mgo.IsDup(err) {
LOG.Info("error[%v] is dup, ignore", err)
return mw.updateOnInsert(input[idx+1:], index[idx+1:])
}
return err
}
return nil
}