plugins/storage/rdbms/report.go (135 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package rdbms
import (
"encoding/json"
"errors"
"fmt"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/types"
"github.com/facebookincubator/contest/pkg/xcontext"
)
// StoreReport persists a run or final report in the internal storage.
func (r *RDBMS) StoreReport(_ xcontext.Context, report *job.Report) error {
reportJSON, err := report.ToJSON()
if err != nil {
return fmt.Errorf("could not serialize final report for job %v: %v", report.JobID, err)
}
r.lockTx()
defer r.unlockTx()
if report.RunID > 0 {
if _, err := r.db.Exec(
"insert into run_reports (job_id, run_id, reporter_name, success, report_time, data) values (?, ?, ?, ?, ?, ?)",
report.JobID, report.RunID, report.ReporterName, report.Success, report.ReportTime, reportJSON); err != nil {
return fmt.Errorf("could not store run report for job %v: %v", report.JobID, err)
}
} else {
if _, err := r.db.Exec(
"insert into final_reports (job_id, reporter_name, success, report_time, data) values (?, ?, ?, ?, ?)",
report.JobID, report.ReporterName, report.Success, report.ReportTime, reportJSON); err != nil {
return fmt.Errorf("could not store final report for job %v: %v", report.JobID, err)
}
}
return nil
}
// GetJobReport retrieves a JobReport from the database
func (r *RDBMS) GetJobReport(ctx xcontext.Context, jobID types.JobID) (*job.JobReport, error) {
var (
runReports [][]*job.Report
currentRunReports []*job.Report
finalReports []*job.Report
)
r.lockTx()
defer r.unlockTx()
// get run reports. Don't change the order by asc, because
// the code below assumes sorted results by ascending run number.
selectStatement := "select success, report_time, reporter_name, run_id, data from run_reports where job_id = ? order by run_id asc, reporter_name asc"
ctx.Debugf("Executing query: %s", selectStatement)
rows, err := r.db.Query(selectStatement, jobID)
if err != nil {
return nil, fmt.Errorf("could not get run report for job %v: %v", jobID, err)
}
defer func() {
if err := rows.Close(); err != nil {
ctx.Warnf("failed to close rows from query statement: %v", err)
}
}()
var lastRunID, currentRunID uint
for rows.Next() {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("could not fetch run report for job %d: %v", jobID, err)
}
var (
report job.Report
data string
)
err = rows.Scan(
&report.Success,
&report.ReportTime,
&report.ReporterName,
¤tRunID,
&data,
)
// Fetch fetches a Job request from storage based on job id
if err != nil {
return nil, fmt.Errorf("failed to scan row while fetching run report for job %d: %v", jobID, err)
}
if err := json.Unmarshal([]byte(data), &report.Data); err != nil {
return nil, fmt.Errorf("failed to unmarshal run report JSON data: %v", err)
}
// rows are sorted by ascending run_id, so if we find a
// non-monotonic run_id or a gap, we return an error.
// This works as long as we can assume ascending sorting, so don't
// change it, or at least change both.
if currentRunID == 0 {
return nil, errors.New("invalid run_id in database, cannot be zero")
}
if currentRunID < lastRunID || currentRunID > lastRunID+1 {
return nil, fmt.Errorf("invalid run_id retrieved from database: either it is not ordered, or there is a gap in run numbers in the database for job %d. Current run number: %d, last run number: %d",
jobID, currentRunID, lastRunID,
)
}
if currentRunID != lastRunID {
// this is the next run number
if lastRunID > 0 {
runReports = append(runReports, currentRunReports)
currentRunReports = make([]*job.Report, 0)
}
lastRunID = currentRunID
}
// These struct fields were added later, populate them from columns.
report.JobID = jobID
report.RunID = types.RunID(currentRunID)
currentRunReports = append(currentRunReports, &report)
}
if len(currentRunReports) > 0 {
runReports = append(runReports, currentRunReports)
}
// get final reports
selectStatement = "select success, report_time, reporter_name, data from final_reports where job_id = ? order by reporter_name asc"
ctx.Debugf("Executing query: %s", selectStatement)
rows2, err := r.db.Query(selectStatement, jobID)
if err != nil {
return nil, fmt.Errorf("could not get final report for job %v: %v", jobID, err)
}
defer func() {
if err := rows2.Close(); err != nil {
ctx.Warnf("failed to close rows2 from query statement: %v", err)
}
}()
for rows2.Next() {
if err := rows2.Err(); err != nil {
return nil, fmt.Errorf("could not fetch final report for job %d: %v", jobID, err)
}
var (
report job.Report
data string
)
err = rows2.Scan(
&report.Success,
&report.ReportTime,
&report.ReporterName,
&data,
)
if err != nil {
return nil, fmt.Errorf("failed to scan row while fetching final report for job %d: %v", jobID, err)
}
if err := json.Unmarshal([]byte(data), &report.Data); err != nil {
return nil, fmt.Errorf("failed to unmarshal final report JSON data: %v", err)
}
// These struct fields were added later, populate them from columns.
report.JobID = jobID
report.RunID = 0
finalReports = append(finalReports, &report)
}
return &job.JobReport{
JobID: jobID,
RunReports: runReports,
FinalReports: finalReports,
}, nil
}