in plugins/storage/rdbms/report.go [46:164]
func (r *RDBMS) GetJobReport(ctx xcontext.Context, jobID types.JobID) (*job.JobReport, error) {
var (
runReports [][]*job.Report
currentRunReports []*job.Report
finalReports []*job.Report
)
r.lockTx()
defer r.unlockTx()
// get run reports. Don't change the order by asc, because
// the code below assumes sorted results by ascending run number.
selectStatement := "select success, report_time, reporter_name, run_id, data from run_reports where job_id = ? order by run_id asc, reporter_name asc"
ctx.Debugf("Executing query: %s", selectStatement)
rows, err := r.db.Query(selectStatement, jobID)
if err != nil {
return nil, fmt.Errorf("could not get run report for job %v: %v", jobID, err)
}
defer func() {
if err := rows.Close(); err != nil {
ctx.Warnf("failed to close rows from query statement: %v", err)
}
}()
var lastRunID, currentRunID uint
for rows.Next() {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("could not fetch run report for job %d: %v", jobID, err)
}
var (
report job.Report
data string
)
err = rows.Scan(
&report.Success,
&report.ReportTime,
&report.ReporterName,
¤tRunID,
&data,
)
// Fetch fetches a Job request from storage based on job id
if err != nil {
return nil, fmt.Errorf("failed to scan row while fetching run report for job %d: %v", jobID, err)
}
if err := json.Unmarshal([]byte(data), &report.Data); err != nil {
return nil, fmt.Errorf("failed to unmarshal run report JSON data: %v", err)
}
// rows are sorted by ascending run_id, so if we find a
// non-monotonic run_id or a gap, we return an error.
// This works as long as we can assume ascending sorting, so don't
// change it, or at least change both.
if currentRunID == 0 {
return nil, errors.New("invalid run_id in database, cannot be zero")
}
if currentRunID < lastRunID || currentRunID > lastRunID+1 {
return nil, fmt.Errorf("invalid run_id retrieved from database: either it is not ordered, or there is a gap in run numbers in the database for job %d. Current run number: %d, last run number: %d",
jobID, currentRunID, lastRunID,
)
}
if currentRunID != lastRunID {
// this is the next run number
if lastRunID > 0 {
runReports = append(runReports, currentRunReports)
currentRunReports = make([]*job.Report, 0)
}
lastRunID = currentRunID
}
// These struct fields were added later, populate them from columns.
report.JobID = jobID
report.RunID = types.RunID(currentRunID)
currentRunReports = append(currentRunReports, &report)
}
if len(currentRunReports) > 0 {
runReports = append(runReports, currentRunReports)
}
// get final reports
selectStatement = "select success, report_time, reporter_name, data from final_reports where job_id = ? order by reporter_name asc"
ctx.Debugf("Executing query: %s", selectStatement)
rows2, err := r.db.Query(selectStatement, jobID)
if err != nil {
return nil, fmt.Errorf("could not get final report for job %v: %v", jobID, err)
}
defer func() {
if err := rows2.Close(); err != nil {
ctx.Warnf("failed to close rows2 from query statement: %v", err)
}
}()
for rows2.Next() {
if err := rows2.Err(); err != nil {
return nil, fmt.Errorf("could not fetch final report for job %d: %v", jobID, err)
}
var (
report job.Report
data string
)
err = rows2.Scan(
&report.Success,
&report.ReportTime,
&report.ReporterName,
&data,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row while fetching final report for job %d: %v", jobID, err)
}
if err := json.Unmarshal([]byte(data), &report.Data); err != nil {
return nil, fmt.Errorf("failed to unmarshal final report JSON data: %v", err)
}
// These struct fields were added later, populate them from columns.
report.JobID = jobID
report.RunID = 0
finalReports = append(finalReports, &report)
}
return &job.JobReport{
JobID: jobID,
RunReports: runReports,
FinalReports: finalReports,
}, nil
}