func()

in nimo-shake/writer/mongodb_mgo_driver.go [100:143]


func (mw *MongoWriter) WriteBulk(input []interface{}) error {
	if len(input) == 0 {
		return nil
	}

	bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk()
	bulk.Unordered()
	bulk.Insert(input...)
	if _, err := bulk.Run(); err != nil {
		if mgo.IsDup(err) {
			LOG.Warn("%s duplicated document found[%v]. reinsert or update", err, mw)
			if !conf.Options.FullExecutorInsertOnDupUpdate || len(mw.primaryIndexes) == 0 {
				LOG.Error("full.executor.insert_on_dup_update==[%v], primaryIndexes length[%v]", conf.Options.FullExecutorInsertOnDupUpdate,
					len(mw.primaryIndexes))
				return err
			}

			// 1. generate index list
			indexList := make([]interface{}, len(input))
			for i, ele := range input {
				inputData := ele.(bson.M)
				index := make(bson.M, len(mw.primaryIndexes))
				for _, primaryIndex := range mw.primaryIndexes {
					// currently, we only support convert type == 'convert', so there is no type inside
					key := *primaryIndex.AttributeName
					if _, ok := inputData[key]; !ok {
						LOG.Error("primary key[%v] is not exists on input data[%v]",
							*primaryIndex.AttributeName, inputData)
					} else {
						index[key] = inputData[key]
					}
				}
				indexList[i] = index
			}

			LOG.Debug(indexList)

			return mw.updateOnInsert(input, indexList)
		}
		return fmt.Errorf("%s insert docs with length[%v] into ns[%s] of dest mongo failed[%v]. first doc: %v",
			mw, len(input), mw.ns, err, input[0])
	}
	return nil
}