func()

in collector/docsyncer/doc_executor.go [182:276]


func (exec *DocExecutor) doSync(docs []*bson.Raw) error {
	if len(docs) == 0 || conf.Options.FullSyncExecutorDebug {
		return nil
	}

	ns := exec.colExecutor.ns

	var models []mongo.WriteModel
	for _, doc := range docs {

		if conf.Options.FullSyncExecutorFilterOrphanDocument && exec.syncer.orphanFilter != nil {
			var docData bson.D
			if err := bson.Unmarshal(*doc, &docData); err != nil {
				LOG.Error("doSync do bson unmarshal %v failed. %v", doc, err)
			}
			// judge whether is orphan document, pass if so
			if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) {
				LOG.Info("orphan document [%v] filter", doc)
				continue
			}
		}

		models = append(models, mongo.NewInsertOneModel().SetDocument(doc))
	}

	// qps limit if enable
	if exec.syncer.qos.Limit > 0 {
		exec.syncer.qos.FetchBucket()
	}

	if conf.Options.LogLevel == utils.VarLogLevelDebug {
		var docBeg, docEnd bson.M
		bson.Unmarshal(*docs[0], &docBeg)
		bson.Unmarshal(*docs[len(docs)-1], &docEnd)
		LOG.Debug("DBSyncer id[%v] doSync BulkWrite with table[%v] batch _id interval [%v, %v]", exec.syncer.id, ns,
			docBeg, docEnd)
	}

	opts := options.BulkWrite().SetOrdered(false)
	res, err := exec.conn.Client.Database(ns.Database).Collection(ns.Collection).BulkWrite(nil, models, opts)

	if err != nil {
		if _, ok := err.(mongo.BulkWriteException); !ok {
			return fmt.Errorf("bulk run failed[%v]", err)
		}

		LOG.Warn("insert docs with length[%v] into ns[%v] of dest mongo failed[%v] res[%v]",
			len(models), ns, (err.(mongo.BulkWriteException)).WriteErrors[0], res)

		var updateModels []mongo.WriteModel
		for _, wError := range (err.(mongo.BulkWriteException)).WriteErrors {
			if utils.DuplicateKey(wError) {
				if !conf.Options.FullSyncExecutorInsertOnDupUpdate {
					return fmt.Errorf("duplicate key error[%v], you can clean the document on the target mongodb, "+
						"or enable %v to solve, but full-sync stage needs restart",
						wError, "full_sync.executor.insert_on_dup_update")
				}

				dupDocument := *docs[wError.Index]
				var updateFilter bson.D
				updateFilterBool := false
				var docData bson.D
				if err := bson.Unmarshal(dupDocument, &docData); err == nil {
					for _, bsonE := range docData {
						if bsonE.Key == "_id" {
							updateFilter = bson.D{bsonE}
							updateFilterBool = true
						}
					}
				}
				if updateFilterBool == false {
					return fmt.Errorf("duplicate key error[%v], can't get _id from document", wError)
				}
				updateModels = append(updateModels, mongo.NewUpdateOneModel().
					SetFilter(updateFilter).SetUpdate(bson.D{{"$set", dupDocument}}))
			} else {
				return fmt.Errorf("bulk run failed[%v]", wError)
			}
		}

		if len(updateModels) != 0 {
			opts := options.BulkWrite().SetOrdered(false)
			_, err := exec.conn.Client.Database(ns.Database).Collection(ns.Collection).BulkWrite(nil, updateModels, opts)
			if err != nil {
				return fmt.Errorf("bulk run updateForInsert failed[%v]", err)
			}
			LOG.Debug("updateForInsert succ updateModels.len:%d updateModules[0]:%v\n",
				len(updateModels), updateModels[0])
		} else {
			return fmt.Errorf("bulk run failed[%v]", err)
		}
	}

	return nil
}