func txnFindManyAndDelete()

in server/datasource/mongo/kv/kv_dao.go [521:610]


func txnFindManyAndDelete(ctx context.Context, kvIDs []string, project, domain string) ([]*model.KVDoc, int64, error) {
	filter := bson.D{
		{Key: "id", Value: bson.M{"$in": kvIDs}},
		{Key: "project", Value: project},
		{Key: "domain", Value: domain}}
	kvs, err := findKeys(ctx, filter, false)
	if err != nil {
		if err != datasource.ErrKeyNotExists {
			openlog.Error("find Keys error: " + err.Error())
		}
		return nil, 0, err
	}
	var deletedCount int64
	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
	if err != nil {
		openlog.Error("fail to start session" + err.Error())
		return nil, 0, err
	}
	if err = taskSession.StartTransaction(); err != nil {
		openlog.Error("fail to start transaction" + err.Error())
		return nil, 0, err
	}
	defer taskSession.EndSession(ctx)

	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
		filter := bson.D{
			{Key: "id", Value: bson.M{"$in": kvIDs}},
			{Key: "project", Value: project},
			{Key: "domain", Value: domain}}
		dr, err := collection.DeleteMany(sessionContext, filter)
		if err != nil {
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
				}))
				return errAbort
			}
			openlog.Error(fmt.Sprintf("delete kvs [%v] failed : [%v]", kvIDs, err))
			return err
		}
		deletedCount = dr.DeletedCount
		tasksDoc := make([]interface{}, deletedCount)
		tombstonesDoc := make([]interface{}, deletedCount)
		for i := 0; i < int(deletedCount); i++ {
			kv := kvs[i]
			task, _ := sync.NewTask(domain, project, sync.DeleteAction, datasource.ConfigResource, kv)
			tombstone := sync.NewTombstone(domain, project, datasource.ConfigResource, datasource.TombstoneID(kv))
			tasksDoc[i] = task
			tombstonesDoc[i] = tombstone
		}
		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
		_, err = collection.InsertMany(sessionContext, tasksDoc)
		if err != nil {
			openlog.Error("create tasks error", openlog.WithTags(openlog.Tags{
				"err": err.Error(),
			}))
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
				}))
			}
			return err
		}
		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTombstone)
		_, err = collection.InsertMany(sessionContext, tombstonesDoc)
		if err != nil {
			openlog.Error("create tombstone error", openlog.WithTags(openlog.Tags{
				"err": err.Error(),
			}))
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
				}))
			}
			return err
		}
		if err = taskSession.CommitTransaction(sessionContext); err != nil {
			return err
		}
		return nil
	}); err != nil {
		openlog.Error(err.Error())
		return nil, 0, err
	}
	return kvs, deletedCount, nil
}