in server/datasource/mongo/kv/kv_dao.go [392:487]
func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
curKV := &model.KVDoc{}
taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
if err != nil {
openlog.Error("fail to start session" + err.Error())
return nil, err
}
if err = taskSession.StartTransaction(); err != nil {
openlog.Error("fail to start transaction" + err.Error())
return nil, err
}
defer taskSession.EndSession(ctx)
if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
sr := collection.FindOneAndDelete(sessionContext, bson.M{"id": kvID, "project": project, "domain": domain})
if sr.Err() != nil {
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
}))
return errAbort
}
if sr.Err() == mongo.ErrNoDocuments {
openlog.Error(datasource.ErrKeyNotExists.Error())
return datasource.ErrKeyNotExists
}
return sr.Err()
}
err := sr.Decode(curKV)
if err != nil {
openlog.Error("decode error: " + err.Error())
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
}))
return errAbort
}
return err
}
task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, curKV)
if err != nil {
openlog.Error("fail to create task" + err.Error())
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
}))
return errAbort
}
return err
}
collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
_, err = collection.InsertOne(sessionContext, task)
if err != nil {
openlog.Error("create task error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
"task": task,
}))
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
"task": task,
}))
}
return err
}
tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(curKV))
collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
_, err = collection.InsertOne(sessionContext, tombstone)
if err != nil {
openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
"err": err.Error(),
"tombstone": tombstone,
}))
errAbort := taskSession.AbortTransaction(sessionContext)
if errAbort != nil {
openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
"err": errAbort.Error(),
"tombstone": tombstone,
}))
}
return err
}
if err = taskSession.CommitTransaction(sessionContext); err != nil {
return err
}
return nil
}); err != nil {
openlog.Error(err.Error())
return nil, err
}
return curKV, nil
}