in nimo-shake/writer/mongodb_community_driver.go [111:163]
func (mcw *MongoCommunityWriter) WriteBulk(input []interface{}) error {
if len(input) == 0 {
return nil
}
// convert input array to models list
models := make([]mongo.WriteModel, len(input))
for i := range models {
models[i] = &mongo.InsertOneModel{Document: input[i]}
LOG.Debug("WriteBulk: %v", input[i])
}
_, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).BulkWrite(nil, models)
if err != nil {
if strings.Contains(err.Error(), "duplicate key error") {
LOG.Warn("%s duplicated document found[%v]. reinsert or update", err, mcw)
if !conf.Options.FullExecutorInsertOnDupUpdate || len(mcw.primaryIndexes) == 0 {
LOG.Error("full.executor.insert_on_dup_update==[%v], primaryIndexes length[%v]", conf.Options.FullExecutorInsertOnDupUpdate,
len(mcw.primaryIndexes))
return err
}
// 1. generate index list
indexList := make([]interface{}, len(input))
for i, ele := range input {
inputData, ok := ele.(bson2.M)
if !ok {
inputData = ele.(map[string]interface{})
LOG.Debug("inputData type:%v content:%v", reflect.TypeOf(inputData), inputData)
}
index := make(bson2.M, len(mcw.primaryIndexes))
for _, primaryIndex := range mcw.primaryIndexes {
// currently, we only support convert type == 'convert', so there is no type inside
key := *primaryIndex.AttributeName
if _, ok := inputData[key]; !ok {
LOG.Error("primary key[%v] is not exists on input data[%v]",
*primaryIndex.AttributeName, inputData)
} else {
index[key] = inputData[key]
}
}
indexList[i] = index
}
LOG.Debug(indexList)
return mcw.updateOnInsert(input, indexList)
}
return fmt.Errorf("%s insert docs with length[%v] into ns[%s] of dest mongo failed[%v]. first doc: %v",
mcw, len(input), mcw.ns, err, input[0])
}
return nil
}