in collector/docsyncer/doc_executor.go [182:276]
func (exec *DocExecutor) doSync(docs []*bson.Raw) error {
if len(docs) == 0 || conf.Options.FullSyncExecutorDebug {
return nil
}
ns := exec.colExecutor.ns
var models []mongo.WriteModel
for _, doc := range docs {
if conf.Options.FullSyncExecutorFilterOrphanDocument && exec.syncer.orphanFilter != nil {
var docData bson.D
if err := bson.Unmarshal(*doc, &docData); err != nil {
LOG.Error("doSync do bson unmarshal %v failed. %v", doc, err)
}
// judge whether is orphan document, pass if so
if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) {
LOG.Info("orphan document [%v] filter", doc)
continue
}
}
models = append(models, mongo.NewInsertOneModel().SetDocument(doc))
}
// qps limit if enable
if exec.syncer.qos.Limit > 0 {
exec.syncer.qos.FetchBucket()
}
if conf.Options.LogLevel == utils.VarLogLevelDebug {
var docBeg, docEnd bson.M
bson.Unmarshal(*docs[0], &docBeg)
bson.Unmarshal(*docs[len(docs)-1], &docEnd)
LOG.Debug("DBSyncer id[%v] doSync BulkWrite with table[%v] batch _id interval [%v, %v]", exec.syncer.id, ns,
docBeg, docEnd)
}
opts := options.BulkWrite().SetOrdered(false)
res, err := exec.conn.Client.Database(ns.Database).Collection(ns.Collection).BulkWrite(nil, models, opts)
if err != nil {
if _, ok := err.(mongo.BulkWriteException); !ok {
return fmt.Errorf("bulk run failed[%v]", err)
}
LOG.Warn("insert docs with length[%v] into ns[%v] of dest mongo failed[%v] res[%v]",
len(models), ns, (err.(mongo.BulkWriteException)).WriteErrors[0], res)
var updateModels []mongo.WriteModel
for _, wError := range (err.(mongo.BulkWriteException)).WriteErrors {
if utils.DuplicateKey(wError) {
if !conf.Options.FullSyncExecutorInsertOnDupUpdate {
return fmt.Errorf("duplicate key error[%v], you can clean the document on the target mongodb, "+
"or enable %v to solve, but full-sync stage needs restart",
wError, "full_sync.executor.insert_on_dup_update")
}
dupDocument := *docs[wError.Index]
var updateFilter bson.D
updateFilterBool := false
var docData bson.D
if err := bson.Unmarshal(dupDocument, &docData); err == nil {
for _, bsonE := range docData {
if bsonE.Key == "_id" {
updateFilter = bson.D{bsonE}
updateFilterBool = true
}
}
}
if updateFilterBool == false {
return fmt.Errorf("duplicate key error[%v], can't get _id from document", wError)
}
updateModels = append(updateModels, mongo.NewUpdateOneModel().
SetFilter(updateFilter).SetUpdate(bson.D{{"$set", dupDocument}}))
} else {
return fmt.Errorf("bulk run failed[%v]", wError)
}
}
if len(updateModels) != 0 {
opts := options.BulkWrite().SetOrdered(false)
_, err := exec.conn.Client.Database(ns.Database).Collection(ns.Collection).BulkWrite(nil, updateModels, opts)
if err != nil {
return fmt.Errorf("bulk run updateForInsert failed[%v]", err)
}
LOG.Debug("updateForInsert succ updateModels.len:%d updateModules[0]:%v\n",
len(updateModels), updateModels[0])
} else {
return fmt.Errorf("bulk run failed[%v]", err)
}
}
return nil
}