plugins/storage/rdbms/request.go (90 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package rdbms
import (
"encoding/json"
"fmt"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/types"
"github.com/facebookincubator/contest/pkg/xcontext"
)
const (
insertJobStmt = "insert into jobs (name, descriptor, extended_descriptor, requestor, server_id, request_time) values (?, ?, ?, ?, ?, ?)"
insertJobTagStmt = "insert into job_tags (job_id, tag) values (?, ?)"
)
// StoreJobRequest stores a new job request in the database
func (r *RDBMS) StoreJobRequest(_ xcontext.Context, request *job.Request) (types.JobID, error) {
var jobID types.JobID
// Extract job tags for insertion.
var desc job.Descriptor
if err := json.Unmarshal([]byte(request.JobDescriptor), &desc); err != nil {
return 0, fmt.Errorf("invalid job descriptor: %w", err)
}
if err := job.CheckTags(desc.Tags, true /* allowInternal */); err != nil {
return 0, err
}
r.lockTx()
defer r.unlockTx()
// serialize the extended descriptor
extendedDescriptor, err := json.Marshal(request.ExtendedDescriptor)
if err != nil {
return jobID, fmt.Errorf("could not serialize extended job descriptor")
}
// store job descriptor
result, err := r.db.Exec(insertJobStmt, request.JobName, request.JobDescriptor, extendedDescriptor, request.Requestor, request.ServerID, request.RequestTime)
if err != nil {
return jobID, fmt.Errorf("could not store job request in database: %w", err)
}
lastID, err := result.LastInsertId()
if err != nil {
return jobID, fmt.Errorf("could not extract id of last request inserted into db")
}
jobID = types.JobID(lastID)
for _, tag := range desc.Tags {
if _, err := r.db.Exec(insertJobTagStmt, jobID, tag); err != nil {
return 0, fmt.Errorf("could not store job tag in the database: %w", err)
}
}
return jobID, nil
}
// GetJobRequest retrieves a JobRequest from the database
func (r *RDBMS) GetJobRequest(ctx xcontext.Context, jobID types.JobID) (*job.Request, error) {
r.lockTx()
defer r.unlockTx()
selectStatement := "select job_id, name, requestor, server_id, request_time, descriptor, extended_descriptor from jobs where job_id = ?"
ctx.Debugf("Executing query: %s", selectStatement)
rows, err := r.db.Query(selectStatement, jobID)
if err != nil {
return nil, fmt.Errorf("could not get job request with id %v: %v", jobID, err)
}
defer func() {
if err := rows.Close(); err != nil {
ctx.Warnf("could not close rows for job request: %v", err)
}
}()
var (
req *job.Request
extendedDescriptorJSON string
)
for rows.Next() {
if req != nil {
// We have already found a matching request. If we find more than one,
// then we have a problem
return nil, fmt.Errorf("multiple requests found with job id %v", jobID)
}
currRequest := job.Request{}
err := rows.Scan(
&currRequest.JobID,
&currRequest.JobName,
&currRequest.Requestor,
&currRequest.ServerID,
&currRequest.RequestTime,
&currRequest.JobDescriptor,
&extendedDescriptorJSON,
)
if err != nil {
return nil, fmt.Errorf("could not get job request with job id %v: %v", jobID, err)
}
req = &currRequest
}
if req == nil {
return nil, fmt.Errorf("could not find request with JobID %d", jobID)
}
extendedDescriptor := job.ExtendedDescriptor{}
if err := json.Unmarshal([]byte(extendedDescriptorJSON), &extendedDescriptor); err != nil {
return nil, fmt.Errorf("failed to unmarshal extended job descriptor: %w", err)
}
req.ExtendedDescriptor = &extendedDescriptor
return req, nil
}