func txnFindOneAndDelete()

in server/datasource/mongo/kv/kv_dao.go [392:487]


func txnFindOneAndDelete(ctx context.Context, kvID, project, domain string) (*model.KVDoc, error) {
	curKV := &model.KVDoc{}
	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
	if err != nil {
		openlog.Error("fail to start session" + err.Error())
		return nil, err
	}
	if err = taskSession.StartTransaction(); err != nil {
		openlog.Error("fail to start transaction" + err.Error())
		return nil, err
	}
	defer taskSession.EndSession(ctx)
	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
		sr := collection.FindOneAndDelete(sessionContext, bson.M{"id": kvID, "project": project, "domain": domain})
		if sr.Err() != nil {
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
				}))
				return errAbort
			}
			if sr.Err() == mongo.ErrNoDocuments {
				openlog.Error(datasource.ErrKeyNotExists.Error())
				return datasource.ErrKeyNotExists
			}
			return sr.Err()
		}
		err := sr.Decode(curKV)
		if err != nil {
			openlog.Error("decode error: " + err.Error())
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
				}))
				return errAbort
			}
			return err
		}
		task, err := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, curKV)
		if err != nil {
			openlog.Error("fail to create task" + err.Error())
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
				}))
				return errAbort
			}
			return err
		}
		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
		_, err = collection.InsertOne(sessionContext, task)
		if err != nil {
			openlog.Error("create task error", openlog.WithTags(openlog.Tags{
				"err":  err.Error(),
				"task": task,
			}))
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err":  errAbort.Error(),
					"task": task,
				}))
			}
			return err
		}
		tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(curKV))
		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
		_, err = collection.InsertOne(sessionContext, tombstone)
		if err != nil {
			openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
				"err":       err.Error(),
				"tombstone": tombstone,
			}))
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err":       errAbort.Error(),
					"tombstone": tombstone,
				}))
			}
			return err
		}
		if err = taskSession.CommitTransaction(sessionContext); err != nil {
			return err
		}
		return nil
	}); err != nil {
		openlog.Error(err.Error())
		return nil, err
	}
	return curKV, nil
}