in backend/plugins/issue_trace/tasks/issue_assignee_history_convertor.go [58:219]
func ConvertIssueAssigneeHistory(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
options := taskCtx.GetData().(*TaskData)
scopeIds := options.ScopeIds
db := taskCtx.GetDal()
insertor := helper.NewBatchSaveDivider(taskCtx, utils.BATCH_SIZE, "", "")
defer insertor.Close()
batchInsertor, err := insertor.ForType(reflect.TypeOf(&models.IssueAssigneeHistory{}))
if err != nil {
logger.Error(err, "Failed to create batch insert")
return err
}
// convert issues without changelogs of assignee
cursorForIssuesWithoutChanglog, err := db.RawCursor(`
select
board_issues.board_id as _raw_data_params,
issues._raw_data_table as _raw_data_table,
issues._raw_data_id as _raw_data_id,
issues.id as issue_id,
issues.created_date as start_date,
issues.assignee_id as user_id,
now() as end_date
from
issues
join board_issues on board_issues.issue_id = issues.id
left join issue_changelogs on issues.id = issue_changelogs.issue_id
and issue_changelogs.field_name = 'assignee'
where
issue_changelogs.issue_id is null
and assignee_id != ''
and board_issues.board_id in ?;
`, scopeIds)
if err != nil {
logger.Error(err, "Failed to query issue assignee")
return err
}
defer cursorForIssuesWithoutChanglog.Close()
convertorForIssuesWithoutChangelog, err := helper.NewDataConverter(helper.DataConverterArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
// Params: scopeId,
Table: "issue_changelogs",
},
InputRowType: reflect.TypeOf(AssigneeHistory{}),
Input: cursorForIssuesWithoutChanglog,
Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
if ctxErr := utils.CheckCancel(taskCtx); ctxErr != nil {
return nil, ctxErr
}
row := inputRow.(*AssigneeHistory)
err := batchInsertor.Add(&models.IssueAssigneeHistory{
NoPKModel: common.NoPKModel{
RawDataOrigin: common.RawDataOrigin{
RawDataParams: row.RawDataParams,
RawDataTable: row.RawDataTable,
RawDataId: row.RawDataId,
RawDataRemark: row.RawDataRemark,
},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
IssueId: row.IssueId,
Assignee: row.UserId,
StartDate: row.StartDate,
EndDate: &row.EndDate,
})
if err != nil {
logger.Error(err, "Failed to create convertor")
return nil, err
}
return nil, nil
},
})
if err != nil {
logger.Error(err, "Failed to create convertor")
return err
}
err = convertorForIssuesWithoutChangelog.Execute()
if err != nil {
logger.Error(err, "Failed to execute convertor")
return err
}
// convert issues with changelogs of assignee
clauses := []dal.Clause{
dal.Select("board_issues.board_id as _raw_data_params, " +
"issue_changelogs._raw_data_table as _raw_data_table, " +
"issue_changelogs._raw_data_id as _raw_data_id, " +
"issue_changelogs.issue_id, " +
"issue_changelogs.original_from_value as from_assignee, " +
"issue_changelogs.original_to_value as to_assignee, " +
"issue_changelogs.created_date as log_created_date, " +
"issues.created_date as issue_created_date",
),
dal.From("issue_changelogs"),
dal.Join("JOIN board_issues ON issue_changelogs.issue_id = board_issues.issue_id"),
dal.Join("JOIN issues ON issue_changelogs.issue_id = issues.id"),
dal.Where("field_name = 'assignee' AND board_issues.board_id in ?", scopeIds),
dal.Orderby("issue_changelogs.issue_id ASC, issue_changelogs.created_date ASC"),
}
cursorForIssueChangelogs, err := db.Cursor(clauses...)
if err != nil {
logger.Error(err, "Failed to query assignee changelogs")
return err
}
defer cursorForIssueChangelogs.Close()
var currentIssue string
var currentLogs = make([]AssigneeChangelog, 0)
convertorForIssueChangelogs, err := helper.NewDataConverter(helper.DataConverterArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
// Params: scopeId,
Table: "issue_changelogs",
},
InputRowType: reflect.TypeOf(AssigneeChangelog{}),
Input: cursorForIssueChangelogs,
Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
if ctxErr := utils.CheckCancel(taskCtx); ctxErr != nil {
return nil, ctxErr
}
row := inputRow.(*AssigneeChangelog)
if row.IssueId != currentIssue {
if len(currentLogs) > 0 {
historyRows := buildActiveAssigneeHistory(currentLogs)
for _, row := range historyRows {
err := batchInsertor.Add(row)
if err != nil {
logger.Error(err, "Failed to insert issue assignee history")
return nil, err
}
}
}
currentIssue = row.IssueId
currentLogs = make([]AssigneeChangelog, 0)
}
currentLogs = append(currentLogs, *row)
return nil, nil
},
})
if err != nil {
logger.Error(err, "Failed to create data convertor")
return err
}
err = convertorForIssueChangelogs.Execute()
if err != nil {
logger.Error(err, "Failed to execute convertor")
return err
}
if len(currentLogs) > 0 {
historyRows := buildActiveAssigneeHistory(currentLogs)
for _, row := range historyRows {
err := batchInsertor.Add(row)
if err != nil {
logger.Error(err, "Failed to insert issue assignee history")
return err
}
}
}
return nil
}