func ConvertIssueStatusHistory()

in backend/plugins/issue_trace/tasks/issue_status_history_convertor.go [59:224]


func ConvertIssueStatusHistory(taskCtx plugin.SubTaskContext) errors.Error {
	logger := taskCtx.GetLogger()
	options := taskCtx.GetData().(*TaskData)
	scopeIds := options.ScopeIds

	db := taskCtx.GetDal()
	inserter := helper.NewBatchSaveDivider(taskCtx, utils.BATCH_SIZE, "", "")
	defer inserter.Close()
	batchInserter, err := inserter.ForType(reflect.TypeOf(&models.IssueStatusHistory{}))
	if err != nil {
		logger.Error(err, "Failed to create batch insert")
		return err
	}

	// handle issues not appeared in change logs (hasn't changed)
	logger.Info("get issues not appeared in change logs, board %s", scopeIds)
	now := time.Now()
	clauses := []dal.Clause{
		dal.Select("issues.id AS issue_id, issues.status, issues.original_status, issues.created_date," +
			"board_issues.board_id as _raw_data_params, issues._raw_data_table as _raw_data_table, issues._raw_data_id as _raw_data_id"),
		dal.From("issues"),
		dal.Join("INNER JOIN board_issues ON board_issues.issue_id = issues.id"),
		dal.Join("LEFT JOIN issue_changelogs ON issue_changelogs.issue_id=issues.id AND issue_changelogs.field_name='status'"),
		dal.Where("board_issues.board_id in ? AND issue_changelogs.field_name IS NULL", scopeIds),
	}
	statusFromIssueCursor, err := db.Cursor(clauses...)
	if err != nil {
		logger.Error(err, "Failed to query issue status")
		return err
	}
	defer statusFromIssueCursor.Close()
	statusFromIssueConvertor, err := helper.NewDataConverter(helper.DataConverterArgs{
		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
			Ctx:    taskCtx,
			Params: options.Options,
			Table:  "issues",
		},
		InputRowType: reflect.TypeOf(IssueStatusWithoutChangeLog{}),
		Input:        statusFromIssueCursor,
		Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
			if ctxErr := utils.CheckCancel(taskCtx); ctxErr != nil {
				return nil, ctxErr
			}
			issue := inputRow.(*IssueStatusWithoutChangeLog)
			var statusSeconds int64 = 0
			if now.After(issue.CreatedDate) {
				statusSeconds = now.Unix() - issue.CreatedDate.Unix()
			}
			err = batchInserter.Add(&models.IssueStatusHistory{
				NoPKModel: common.NoPKModel{
					RawDataOrigin: common.RawDataOrigin{
						RawDataTable:  issue.RawDataTable,
						RawDataParams: issue.RawDataParams,
						RawDataId:     issue.RawDataId,
					},
				},
				IssueId:           issue.IssueId,
				Status:            issue.Status,
				OriginalStatus:    issue.OriginalStatus,
				StartDate:         issue.CreatedDate,
				EndDate:           &now,
				StatusTimeMinutes: int32(statusSeconds / 60),
				IsFirstStatus:     true,
			})
			return nil, err
		},
	})
	if err != nil {
		logger.Error(err, "Failed to create statusFromIssueConvertor")
		return err
	}
	err = statusFromIssueConvertor.Execute()
	if err != nil {
		logger.Error(err, "Failed to execute statusFromIssueConvertor")
		return err
	}

	// handle issues with changelogs
	logger.Info("get issue status change log, board %s", scopeIds)
	clauses = []dal.Clause{
		dal.Select("issue_changelogs.issue_id, issue_changelogs.created_date AS log_created_date, " +
			"issue_changelogs.to_value, issue_changelogs.original_to_value, issue_changelogs.from_value, " +
			"issue_changelogs.original_from_value, issues.created_date AS issue_created_date," +
			"board_issues.board_id as _raw_data_params, issue_changelogs._raw_data_table as _raw_data_table, issue_changelogs._raw_data_id as _raw_data_id"),
		dal.From("issue_changelogs"),
		dal.Join("INNER JOIN issues ON issues.id = issue_changelogs.issue_id"),
		dal.Join("INNER JOIN board_issues ON board_issues.issue_id = issue_changelogs.issue_id"),
		dal.Where("board_issues.board_id in ? AND issue_changelogs.field_name = 'status'", scopeIds),
		dal.Orderby("issue_changelogs.issue_id ASC, issue_changelogs.created_date ASC"),
	}
	statusFromChangelogCursor, err := db.Cursor(clauses...)
	if err != nil {
		logger.Error(err, "Failed to query status changelogs")
		return err
	}
	defer statusFromChangelogCursor.Close()

	var currentIssue string
	var currentLogs = make([]*StatusChangeLogResult, 0)

	statusFromChangelogConvertor, err := helper.NewDataConverter(helper.DataConverterArgs{
		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
			Ctx:    taskCtx,
			Params: options.Options,
			Table:  "issue_changelogs",
		},
		InputRowType: reflect.TypeOf(StatusChangeLogResult{}),
		Input:        statusFromChangelogCursor,
		Convert: func(inputRow interface{}) ([]interface{}, errors.Error) {
			if ctxErr := utils.CheckCancel(taskCtx); ctxErr != nil {
				return nil, ctxErr
			}
			logRow := inputRow.(*StatusChangeLogResult)
			if logRow.IssueId != currentIssue { // reach new issue section
				if len(currentLogs) > 0 {
					historyRows := buildStatusHistoryRecords(currentLogs)
					for _, r := range historyRows {
						if r.EndDate != nil {
							var seconds int64 = 0
							if r.EndDate.After(r.StartDate) {
								seconds = r.EndDate.Unix() - r.StartDate.Unix()
							}
							r.StatusTimeMinutes = int32(seconds / 60)
						}
						err = batchInserter.Add(r)
						if err != nil {
							return nil, err
						}
					}
				}
				currentIssue = logRow.IssueId
				currentLogs = make([]*StatusChangeLogResult, 0)
			}
			currentLogs = append(currentLogs, logRow)
			return nil, nil
		},
	})
	if err != nil {
		logger.Error(err, "Failed to create statusFromChangelogConvertor")
		return err
	}
	err = statusFromChangelogConvertor.Execute()
	if err != nil {
		logger.Error(err, "Failed to execute statusFromChangelogConvertor")
		return err
	}

	if len(currentLogs) > 0 {
		historyRows := buildStatusHistoryRecords(currentLogs)
		for _, r := range historyRows {
			if r.EndDate != nil {
				var seconds int64 = 0
				if r.EndDate.After(r.StartDate) {
					seconds = r.EndDate.Unix() - r.StartDate.Unix()
				}
				r.StatusTimeMinutes = int32(seconds / 60)
			}
			err = batchInserter.Add(r)
			if err != nil {
				return err
			}
		}
	}
	logger.Info("issues status history covert successfully")
	return nil
}