in nimo-shake/writer/mongodb_mgo_driver.go [100:143]
func (mw *MongoWriter) WriteBulk(input []interface{}) error {
if len(input) == 0 {
return nil
}
bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk()
bulk.Unordered()
bulk.Insert(input...)
if _, err := bulk.Run(); err != nil {
if mgo.IsDup(err) {
LOG.Warn("%s duplicated document found[%v]. reinsert or update", err, mw)
if !conf.Options.FullExecutorInsertOnDupUpdate || len(mw.primaryIndexes) == 0 {
LOG.Error("full.executor.insert_on_dup_update==[%v], primaryIndexes length[%v]", conf.Options.FullExecutorInsertOnDupUpdate,
len(mw.primaryIndexes))
return err
}
// 1. generate index list
indexList := make([]interface{}, len(input))
for i, ele := range input {
inputData := ele.(bson.M)
index := make(bson.M, len(mw.primaryIndexes))
for _, primaryIndex := range mw.primaryIndexes {
// currently, we only support convert type == 'convert', so there is no type inside
key := *primaryIndex.AttributeName
if _, ok := inputData[key]; !ok {
LOG.Error("primary key[%v] is not exists on input data[%v]",
*primaryIndex.AttributeName, inputData)
} else {
index[key] = inputData[key]
}
}
indexList[i] = index
}
LOG.Debug(indexList)
return mw.updateOnInsert(input, indexList)
}
return fmt.Errorf("%s insert docs with length[%v] into ns[%s] of dest mongo failed[%v]. first doc: %v",
mw, len(input), mw.ns, err, input[0])
}
return nil
}