func()

in nimo-shake/writer/mongodb_community_driver.go [111:163]


func (mcw *MongoCommunityWriter) WriteBulk(input []interface{}) error {
	if len(input) == 0 {
		return nil
	}

	// convert input array to models list
	models := make([]mongo.WriteModel, len(input))
	for i := range models {
		models[i] = &mongo.InsertOneModel{Document: input[i]}
		LOG.Debug("WriteBulk: %v", input[i])
	}

	_, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).BulkWrite(nil, models)
	if err != nil {
		if strings.Contains(err.Error(), "duplicate key error") {
			LOG.Warn("%s duplicated document found[%v]. reinsert or update", err, mcw)
			if !conf.Options.FullExecutorInsertOnDupUpdate || len(mcw.primaryIndexes) == 0 {
				LOG.Error("full.executor.insert_on_dup_update==[%v], primaryIndexes length[%v]", conf.Options.FullExecutorInsertOnDupUpdate,
					len(mcw.primaryIndexes))
				return err
			}

			// 1. generate index list
			indexList := make([]interface{}, len(input))
			for i, ele := range input {
				inputData, ok := ele.(bson2.M)
				if !ok {
					inputData = ele.(map[string]interface{})
					LOG.Debug("inputData type:%v content:%v", reflect.TypeOf(inputData), inputData)
				}
				index := make(bson2.M, len(mcw.primaryIndexes))
				for _, primaryIndex := range mcw.primaryIndexes {
					// currently, we only support convert type == 'convert', so there is no type inside
					key := *primaryIndex.AttributeName
					if _, ok := inputData[key]; !ok {
						LOG.Error("primary key[%v] is not exists on input data[%v]",
							*primaryIndex.AttributeName, inputData)
					} else {
						index[key] = inputData[key]
					}
				}
				indexList[i] = index
			}

			LOG.Debug(indexList)

			return mcw.updateOnInsert(input, indexList)
		}
		return fmt.Errorf("%s insert docs with length[%v] into ns[%s] of dest mongo failed[%v]. first doc: %v",
			mcw, len(input), mcw.ns, err, input[0])
	}
	return nil
}