in plugins/storage/rdbms/list_jobs.go [18:87]
func (r *RDBMS) ListJobs(_ xcontext.Context, query *storage.JobQuery) ([]types.JobID, error) {
res := []types.JobID{}
// Quoting SQL strings is hard. https://github.com/golang/go/issues/18478
// For now, just disallow anything that is not [a-zA-Z0-9_-]
if err := job.CheckTags(query.Tags, true /* allowInternal */); err != nil {
return nil, err
}
// Job state is updated by framework events, ensure there aren't any pending.
err := r.flushFrameworkEvents()
if err != nil {
return nil, fmt.Errorf("could not flush events before reading events: %v", err)
}
// Construct the query.
parts := []string{"SELECT jobs.job_id FROM jobs"}
qargs := []interface{}{}
// Tag filtering uses joins, 1 per tag.
for i := range query.Tags {
parts = append(parts, fmt.Sprintf(`INNER JOIN job_tags jt%d ON jobs.job_id = jt%d.job_id`, i, i))
}
var conds []string
if len(query.ServerID) > 0 {
conds = append(conds, "jobs.server_id = ?")
qargs = append(qargs, query.ServerID)
}
if len(query.States) > 0 {
stst := make([]string, len(query.States))
for i, st := range query.States {
stst[i] = "?"
qargs = append(qargs, st)
}
conds = append(conds, fmt.Sprintf("jobs.state IN (%s)", strings.Join(stst, ", ")))
}
// Now the corresponding conditions.
for i, tag := range query.Tags {
conds = append(conds, fmt.Sprintf("jt%d.tag = ?", i))
qargs = append(qargs, tag)
}
if len(conds) > 0 {
parts = append(parts, "WHERE", strings.Join(conds, " AND "))
}
/* Examples of the resulting queries:
SELECT jobs.job_id FROM jobs
SELECT jobs.job_id FROM jobs WHERE jobs.state IN (0)
SELECT jobs.job_id FROM jobs WHERE jobs.state IN (2, 3)
SELECT jobs.job_id FROM jobs INNER JOIN job_tags jt0 ON jobs.job_id = jt0.job_id WHERE jt0.tag = "tests"
SELECT jobs.job_id FROM jobs INNER JOIN job_tags jt0 ON jobs.job_id = jt0.job_id INNER JOIN job_tags jt1 ON jobs.job_id = jt1.job_id WHERE jobs.state IN (2, 3, 4) AND jt0.tag = "tests" AND jt1.tag = "foo"
*/
parts = append(parts, "ORDER BY jobs.job_id")
stmt := strings.Join(parts, " ")
rows, err := r.db.Query(stmt, qargs...)
if err != nil {
return nil, fmt.Errorf("could not list jobs in states (sql: %q): %w", stmt, err)
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var jobID types.JobID
if err := rows.Scan(&jobID); err != nil {
return nil, fmt.Errorf("could not list jobs in states (sql: %q): %w", stmt, err)
}
res = append(res, jobID)
}
return res, nil
}