internal/repo/activity/vote_repo.go (428 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package activity
import (
"context"
"fmt"
"time"
"github.com/apache/answer/internal/service/content"
"github.com/segmentfault/pacman/log"
"github.com/apache/answer/internal/base/constant"
"github.com/apache/answer/internal/service/notice_queue"
"github.com/apache/answer/pkg/converter"
"github.com/apache/answer/internal/base/pager"
"github.com/apache/answer/internal/service/rank"
"github.com/apache/answer/pkg/obj"
"xorm.io/builder"
"github.com/apache/answer/internal/base/data"
"github.com/apache/answer/internal/base/reason"
"github.com/apache/answer/internal/entity"
"github.com/apache/answer/internal/schema"
"github.com/apache/answer/internal/service/activity_common"
"github.com/segmentfault/pacman/errors"
"xorm.io/xorm"
)
// VoteRepo activity repository
type VoteRepo struct {
data *data.Data
activityRepo activity_common.ActivityRepo
userRankRepo rank.UserRankRepo
notificationQueueService notice_queue.NotificationQueueService
}
// NewVoteRepo new repository
func NewVoteRepo(
data *data.Data,
activityRepo activity_common.ActivityRepo,
userRankRepo rank.UserRankRepo,
notificationQueueService notice_queue.NotificationQueueService,
) content.VoteRepo {
return &VoteRepo{
data: data,
activityRepo: activityRepo,
userRankRepo: userRankRepo,
notificationQueueService: notificationQueueService,
}
}
func (vr *VoteRepo) Vote(ctx context.Context, op *schema.VoteOperationInfo) (err error) {
noNeedToVote, err := vr.votePreCheck(ctx, op)
if err != nil {
return err
}
if noNeedToVote {
return nil
}
sendInboxNotification := false
maxDailyRank, err := vr.userRankRepo.GetMaxDailyRank(ctx)
if err != nil {
return err
}
var userIDs []string
for _, activity := range op.Activities {
userIDs = append(userIDs, activity.ActivityUserID)
}
_, err = vr.data.DB.Transaction(func(session *xorm.Session) (result any, err error) {
session = session.Context(ctx)
userInfoMapping, err := vr.acquireUserInfo(session, userIDs)
if err != nil {
return nil, err
}
err = vr.setActivityRankToZeroIfUserReachLimit(ctx, session, op, userInfoMapping, maxDailyRank)
if err != nil {
return nil, err
}
sendInboxNotification, err = vr.saveActivitiesAvailable(session, op)
if err != nil {
return nil, err
}
err = vr.changeUserRank(ctx, session, op, userInfoMapping)
if err != nil {
return nil, err
}
return nil, nil
})
if err != nil {
return err
}
for _, activity := range op.Activities {
if activity.Rank == 0 {
continue
}
vr.sendAchievementNotification(ctx, activity.ActivityUserID, op.ObjectCreatorUserID, op.ObjectID)
}
if sendInboxNotification {
vr.sendVoteInboxNotification(ctx, op.OperatingUserID, op.ObjectCreatorUserID, op.ObjectID, op.VoteUp)
}
return nil
}
func (vr *VoteRepo) CancelVote(ctx context.Context, op *schema.VoteOperationInfo) (err error) {
// Pre-Check
// 1. check if the activity exist
// 2. check if the activity is not cancelled
// 3. if all activities are cancelled, return directly
activities, err := vr.getExistActivity(ctx, op)
if err != nil {
return err
}
var userIDs []string
for _, activity := range activities {
if activity.Cancelled == entity.ActivityCancelled {
continue
}
userIDs = append(userIDs, activity.UserID)
}
if len(userIDs) == 0 {
return nil
}
_, err = vr.data.DB.Transaction(func(session *xorm.Session) (result any, err error) {
session = session.Context(ctx)
userInfoMapping, err := vr.acquireUserInfo(session, userIDs)
if err != nil {
return nil, err
}
err = vr.cancelActivities(session, activities)
if err != nil {
return nil, err
}
err = vr.rollbackUserRank(ctx, session, activities, userInfoMapping)
if err != nil {
return nil, err
}
return nil, nil
})
if err != nil {
return err
}
for _, activity := range activities {
if activity.Rank == 0 {
continue
}
vr.sendAchievementNotification(ctx, activity.UserID, op.ObjectCreatorUserID, op.ObjectID)
}
return nil
}
func (vr *VoteRepo) GetAndSaveVoteResult(ctx context.Context, objectID, objectType string) (
up, down int64, err error) {
up = vr.countVoteUp(ctx, objectID, objectType)
down = vr.countVoteDown(ctx, objectID, objectType)
err = vr.updateVotes(ctx, objectID, objectType, int(up-down))
return
}
func (vr *VoteRepo) ListUserVotes(ctx context.Context, userID string,
page int, pageSize int, activityTypes []int) (voteList []*entity.Activity, total int64, err error) {
session := vr.data.DB.Context(ctx)
cond := builder.
And(
builder.Eq{"user_id": userID},
builder.Eq{"cancelled": 0},
builder.In("activity_type", activityTypes),
)
session.Where(cond).Desc("updated_at")
total, err = pager.Help(page, pageSize, &voteList, &entity.Activity{}, session)
if err != nil {
err = errors.InternalServer(reason.DatabaseError).WithError(err).WithStack()
}
return
}
func (vr *VoteRepo) votePreCheck(ctx context.Context, op *schema.VoteOperationInfo) (noNeedToVote bool, err error) {
activities, err := vr.getExistActivity(ctx, op)
if err != nil {
return false, err
}
done := 0
for _, activity := range activities {
if activity.Cancelled == entity.ActivityAvailable {
done++
}
}
return done == len(op.Activities), nil
}
func (vr *VoteRepo) acquireUserInfo(session *xorm.Session, userIDs []string) (map[string]*entity.User, error) {
us := make([]*entity.User, 0)
err := session.In("id", userIDs).ForUpdate().Find(&us)
if err != nil {
log.Error(err)
return nil, err
}
users := make(map[string]*entity.User, 0)
for _, u := range us {
users[u.ID] = u
}
return users, nil
}
func (vr *VoteRepo) setActivityRankToZeroIfUserReachLimit(ctx context.Context, session *xorm.Session,
op *schema.VoteOperationInfo, userInfoMapping map[string]*entity.User, maxDailyRank int) (err error) {
// check if user reach daily rank limit
for _, activity := range op.Activities {
if userInfoMapping[activity.ActivityUserID] == nil {
continue
}
if activity.Rank > 0 {
// check if reach max daily rank
reach, err := vr.userRankRepo.CheckReachLimit(ctx, session, activity.ActivityUserID, maxDailyRank)
if err != nil {
log.Error(err)
return err
}
if reach {
activity.Rank = 0
continue
}
} else {
// If user rank is lower than 1 after this action, then user rank will be set to 1 only.
userCurrentScore := userInfoMapping[activity.ActivityUserID].Rank
if userCurrentScore+activity.Rank < 1 {
activity.Rank = 1 - userCurrentScore
}
}
}
return nil
}
func (vr *VoteRepo) changeUserRank(ctx context.Context, session *xorm.Session,
op *schema.VoteOperationInfo,
userInfoMapping map[string]*entity.User) (err error) {
for _, activity := range op.Activities {
if activity.Rank == 0 {
continue
}
user := userInfoMapping[activity.ActivityUserID]
if user == nil {
continue
}
if err = vr.userRankRepo.ChangeUserRank(ctx, session,
activity.ActivityUserID, user.Rank, activity.Rank); err != nil {
log.Error(err)
return err
}
}
return nil
}
func (vr *VoteRepo) rollbackUserRank(ctx context.Context, session *xorm.Session,
activities []*entity.Activity,
userInfoMapping map[string]*entity.User) (err error) {
for _, activity := range activities {
if activity.Rank == 0 {
continue
}
user := userInfoMapping[activity.UserID]
if user == nil {
continue
}
if err = vr.userRankRepo.ChangeUserRank(ctx, session,
activity.UserID, user.Rank, -activity.Rank); err != nil {
log.Error(err)
return err
}
}
return nil
}
// saveActivitiesAvailable save activities
// If activity not exist it will be created or else will be updated
// If this activity is already exist, set activity rank to 0
// So after this function, the activity rank will be correct for update user rank
func (vr *VoteRepo) saveActivitiesAvailable(session *xorm.Session, op *schema.VoteOperationInfo) (newAct bool, err error) {
for _, activity := range op.Activities {
existsActivity := &entity.Activity{}
exist, err := session.
Where(builder.Eq{"object_id": op.ObjectID}).
And(builder.Eq{"user_id": activity.ActivityUserID}).
And(builder.Eq{"trigger_user_id": activity.TriggerUserID}).
And(builder.Eq{"activity_type": activity.ActivityType}).
Get(existsActivity)
if err != nil {
return false, err
}
if exist && existsActivity.Cancelled == entity.ActivityAvailable {
activity.Rank = 0
continue
}
if exist {
bean := &entity.Activity{
Cancelled: entity.ActivityAvailable,
Rank: activity.Rank,
HasRank: activity.HasRank(),
}
session.Where("id = ?", existsActivity.ID)
if _, err = session.Cols("`cancelled`", "`rank`", "`has_rank`").
Update(bean); err != nil {
return false, err
}
} else {
insertActivity := entity.Activity{
ObjectID: op.ObjectID,
OriginalObjectID: op.ObjectID,
UserID: activity.ActivityUserID,
TriggerUserID: converter.StringToInt64(activity.TriggerUserID),
ActivityType: activity.ActivityType,
Rank: activity.Rank,
HasRank: activity.HasRank(),
Cancelled: entity.ActivityAvailable,
}
_, err = session.Insert(&insertActivity)
if err != nil {
return false, err
}
newAct = true
}
}
return newAct, nil
}
// cancelActivities cancel activities
// If this activity is already cancelled, set activity rank to 0
// So after this function, the activity rank will be correct for update user rank
func (vr *VoteRepo) cancelActivities(session *xorm.Session, activities []*entity.Activity) (err error) {
for _, activity := range activities {
t := &entity.Activity{}
exist, err := session.ID(activity.ID).Get(t)
if err != nil {
log.Error(err)
return err
}
if !exist {
log.Error(fmt.Errorf("%s activity not exist", activity.ID))
return fmt.Errorf("%s activity not exist", activity.ID)
}
// If this activity is already cancelled, set activity rank to 0
if t.Cancelled == entity.ActivityCancelled {
activity.Rank = 0
}
if _, err = session.ID(activity.ID).Cols("cancelled", "cancelled_at").
Update(&entity.Activity{
Cancelled: entity.ActivityCancelled,
CancelledAt: time.Now(),
}); err != nil {
log.Error(err)
return err
}
}
return nil
}
func (vr *VoteRepo) getExistActivity(ctx context.Context, op *schema.VoteOperationInfo) ([]*entity.Activity, error) {
var activities []*entity.Activity
for _, action := range op.Activities {
t := &entity.Activity{}
exist, err := vr.data.DB.Context(ctx).
Where(builder.Eq{"user_id": action.ActivityUserID}).
And(builder.Eq{"trigger_user_id": action.TriggerUserID}).
And(builder.Eq{"activity_type": action.ActivityType}).
And(builder.Eq{"object_id": op.ObjectID}).
Get(t)
if err != nil {
return nil, errors.InternalServer(reason.DatabaseError).WithError(err).WithStack()
}
if exist {
activities = append(activities, t)
}
}
return activities, nil
}
func (vr *VoteRepo) countVoteUp(ctx context.Context, objectID, objectType string) (count int64) {
count, err := vr.countVote(ctx, objectID, objectType, constant.ActVoteUp)
if err != nil {
log.Errorf("get vote up count error: %v", err)
}
return count
}
func (vr *VoteRepo) countVoteDown(ctx context.Context, objectID, objectType string) (count int64) {
count, err := vr.countVote(ctx, objectID, objectType, constant.ActVoteDown)
if err != nil {
log.Errorf("get vote down count error: %v", err)
}
return count
}
func (vr *VoteRepo) countVote(ctx context.Context, objectID, objectType, action string) (count int64, err error) {
activity := &entity.Activity{}
activityType, _ := vr.activityRepo.GetActivityTypeByObjectType(ctx, objectType, action)
count, err = vr.data.DB.Context(ctx).Where(builder.Eq{"object_id": objectID}).
And(builder.Eq{"activity_type": activityType}).
And(builder.Eq{"cancelled": 0}).
Count(activity)
if err != nil {
err = errors.InternalServer(reason.DatabaseError).WithError(err).WithStack()
}
return count, err
}
func (vr *VoteRepo) updateVotes(ctx context.Context, objectID, objectType string, voteCount int) (err error) {
session := vr.data.DB.Context(ctx)
switch objectType {
case constant.QuestionObjectType:
_, err = session.ID(objectID).Cols("vote_count").Update(&entity.Question{VoteCount: voteCount})
case constant.AnswerObjectType:
_, err = session.ID(objectID).Cols("vote_count").Update(&entity.Answer{VoteCount: voteCount})
case constant.CommentObjectType:
_, err = session.ID(objectID).Cols("vote_count").Update(&entity.Comment{VoteCount: voteCount})
}
if err != nil {
log.Error(err)
}
return
}
func (vr *VoteRepo) sendAchievementNotification(ctx context.Context, activityUserID, objectUserID, objectID string) {
objectType, err := obj.GetObjectTypeStrByObjectID(objectID)
if err != nil {
return
}
msg := &schema.NotificationMsg{
ReceiverUserID: activityUserID,
TriggerUserID: objectUserID,
Type: schema.NotificationTypeAchievement,
ObjectID: objectID,
ObjectType: objectType,
}
vr.notificationQueueService.Send(ctx, msg)
}
func (vr *VoteRepo) sendVoteInboxNotification(ctx context.Context, triggerUserID, receiverUserID, objectID string, upvote bool) {
if triggerUserID == receiverUserID {
return
}
objectType, _ := obj.GetObjectTypeStrByObjectID(objectID)
msg := &schema.NotificationMsg{
TriggerUserID: triggerUserID,
ReceiverUserID: receiverUserID,
Type: schema.NotificationTypeInbox,
ObjectID: objectID,
ObjectType: objectType,
}
if objectType == constant.QuestionObjectType {
if upvote {
msg.NotificationAction = constant.NotificationUpVotedTheQuestion
} else {
msg.NotificationAction = constant.NotificationDownVotedTheQuestion
}
}
if objectType == constant.AnswerObjectType {
if upvote {
msg.NotificationAction = constant.NotificationUpVotedTheAnswer
} else {
msg.NotificationAction = constant.NotificationDownVotedTheAnswer
}
}
if objectType == constant.CommentObjectType {
if upvote {
msg.NotificationAction = constant.NotificationUpVotedTheComment
}
}
if len(msg.NotificationAction) > 0 {
vr.notificationQueueService.Send(ctx, msg)
}
}