func txnUpdate()

in server/datasource/mongo/kv/kv_dao.go [164:247]


func txnUpdate(ctx context.Context, kv *model.KVDoc) error {
	taskSession, err := dmongo.GetClient().GetDB().Client().StartSession()
	if err != nil {
		return err
	}
	if err = taskSession.StartTransaction(); err != nil {
		return err
	}
	defer taskSession.EndSession(ctx)
	if err = mongo.WithSession(ctx, taskSession, func(sessionContext mongo.SessionContext) error {
		collection := dmongo.GetClient().GetDB().Collection(mmodel.CollectionKV)
		result := collection.FindOneAndUpdate(sessionContext, bson.M{"key": kv.Key, "label_format": kv.LabelFormat}, bson.D{
			{Key: "$set", Value: bson.D{
				{Key: "value", Value: kv.Value},
				{Key: "status", Value: kv.Status},
				{Key: "checker", Value: kv.Checker},
				{Key: "update_time", Value: kv.UpdateTime},
				{Key: "update_revision", Value: kv.UpdateRevision},
			}},
		})
		if result.Err() != nil {
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
					"kv":  kv,
				}))
			}
			if result.Err() == mongo.ErrNoDocuments {
				return datasource.ErrKeyNotExists
			}
			return result.Err()
		}
		curKV := &model.KVDoc{}
		err := result.Decode(curKV)
		if err != nil {
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
					"kv":  kv,
				}))
			}
			openlog.Error("decode error: " + err.Error())
			return err
		}
		task, err := sync.NewTask(kv.Domain, kv.Project, sync.UpdateAction, datasource.ConfigResource, curKV)
		if err != nil {
			openlog.Error("fail to create task" + err.Error())
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err": errAbort.Error(),
					"kv":  kv,
				}))
			}
			return err
		}
		collection = dmongo.GetClient().GetDB().Collection(mmodel.CollectionTask)
		_, err = collection.InsertOne(sessionContext, task)
		if err != nil {
			openlog.Error("create task error", openlog.WithTags(openlog.Tags{
				"err":  err.Error(),
				"task": task,
			}))
			errAbort := taskSession.AbortTransaction(sessionContext)
			if errAbort != nil {
				openlog.Error("abort transaction", openlog.WithTags(openlog.Tags{
					"err":  errAbort.Error(),
					"task": task,
				}))
			}
			return err
		}
		if err = taskSession.CommitTransaction(sessionContext); err != nil {
			return err
		}
		return nil
	}); err != nil {
		openlog.Error(err.Error())
		return err
	}
	return nil
}