pkg/jobmanager/jobmanager.go (263 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package jobmanager
import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/insomniacslk/xjson"
"github.com/facebookincubator/contest/pkg/api"
"github.com/facebookincubator/contest/pkg/event"
"github.com/facebookincubator/contest/pkg/event/frameworkevent"
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/pluginregistry"
"github.com/facebookincubator/contest/pkg/runner"
"github.com/facebookincubator/contest/pkg/storage"
"github.com/facebookincubator/contest/pkg/types"
"github.com/facebookincubator/contest/pkg/xcontext"
)
// ErrorEventPayload represents the payload carried by a failure event (e.g. JobStateFailed, JobStateCancelled, etc.)
type ErrorEventPayload struct {
Err xjson.Error
}
// JobManager is the core component for the long-running job management service.
// It handles API requests, test fetching, target fetching, and jobs lifecycle.
//
// In more detail, it is responsible for:
// * spawning the API listener, and handling the incoming requests
// * fetching targets, via target managers
// * fetching test definitions, via test fetchers
// * enqueuing new job requests, and handling their status
// * starting, stopping, and retrying jobs
type JobManager struct {
config
jobs map[types.JobID]*jobInfo
jobRunner *runner.JobRunner
jobsMu sync.Mutex
jsm storage.JobStorageManager
frameworkEvManager frameworkevent.EmitterFetcher
testEvManager testevent.Fetcher
apiListener api.Listener
pluginRegistry *pluginregistry.PluginRegistry
apiCancel xcontext.CancelFunc
msgCounter int
}
type jobInfo struct {
job *job.Job
pause, cancel func()
}
// New initializes and returns a new JobManager with the given API listener.
func New(l api.Listener, pr *pluginregistry.PluginRegistry, opts ...Option) (*JobManager, error) {
if pr == nil {
return nil, errors.New("plugin registry cannot be nil")
}
jsm := storage.NewJobStorageManager()
frameworkEvManager := storage.NewFrameworkEventEmitterFetcher()
testEvManager := storage.NewTestEventFetcher()
cfg := getConfig(opts...)
if cfg.instanceTag != "" {
if err := job.IsValidTag(cfg.instanceTag, true /* allowInternal */); err != nil {
return nil, fmt.Errorf("invalid instaceTag: %w", err)
}
if !job.IsInternalTag(cfg.instanceTag) {
return nil, fmt.Errorf("instaceTag must be an internal tag (start with %q)", job.InternalTagPrefix)
}
}
jm := JobManager{
config: cfg,
apiListener: l,
pluginRegistry: pr,
jobs: make(map[types.JobID]*jobInfo),
jsm: jsm,
frameworkEvManager: frameworkEvManager,
testEvManager: testEvManager,
}
jm.jobRunner = runner.NewJobRunner(jsm, cfg.clock, cfg.targetLockDuration)
return &jm, nil
}
func (jm *JobManager) handleEvent(ev *api.Event) {
var resp *api.EventResponse
switch ev.Type {
case api.EventTypeStart:
resp = jm.start(ev)
case api.EventTypeStatus:
resp = jm.status(ev)
case api.EventTypeStop:
resp = jm.stop(ev)
case api.EventTypeRetry:
resp = jm.retry(ev)
case api.EventTypeList:
resp = jm.list(ev)
default:
resp = &api.EventResponse{
Requestor: ev.Msg.Requestor(),
Err: fmt.Errorf("invalid event type: %v", ev.Type),
}
}
ev.Context.Debugf("Sending response %+v", resp)
// time to wait before printing an error if the response is not received.
sendEventTimeout := 3 * time.Second
select {
case ev.RespCh <- resp:
case <-time.After(sendEventTimeout):
// TODO send failure event once we have the event infra
// TODO determine whether the server should shut down if there
// are too many errors
ev.Context.Logger().Panicf("timed out after %v trying to send a response event", sendEventTimeout)
}
}
// Run is responsible for starting the API listener and responding to incoming events.
func (jm *JobManager) Run(ctx xcontext.Context, resumeJobs bool) error {
jm.jobRunner.StartLockRefresh()
defer jm.jobRunner.StopLockRefresh()
a, err := api.New(jm.config.apiOptions...)
if err != nil {
return fmt.Errorf("Cannot start API: %w", err)
}
// Deal with zombieed jobs (fail them).
if err := jm.failZombieJobs(ctx, a.ServerID()); err != nil {
ctx.Errorf("failed to fail jobs: %v", err)
}
// First, resume paused jobs.
if resumeJobs {
if err := jm.resumeJobs(ctx, a.ServerID()); err != nil {
return fmt.Errorf("failed to resume jobs: %w", err)
}
}
apiCtx, apiCancel := xcontext.WithCancel(ctx)
jm.apiCancel = apiCancel
errCh := make(chan error, 1)
go func() {
lErr := jm.apiListener.Serve(apiCtx, a)
ctx.Infof("Listener shut down successfully.")
errCh <- lErr
close(errCh)
}()
var handlerWg sync.WaitGroup
loop:
for {
select {
// handle events from the API
case ev := <-a.Events:
ev.Context.Debugf("Handling event %+v", ev)
handlerWg.Add(1)
go func() {
defer handlerWg.Done()
jm.handleEvent(ev)
}()
// check for errors or premature termination from the listener.
case err := <-errCh:
if err != nil {
ctx.Infof("JobManager: API listener failed (%v)", err)
}
break loop
case <-ctx.Until(xcontext.ErrPaused):
ctx.Infof("Paused")
jm.PauseAll(ctx)
break loop
case <-ctx.Done():
break loop
}
}
// Stop the API (if not already)
jm.StopAPI()
<-errCh
// Wait for event handler completion
handlerWg.Wait()
// Wait for jobs to complete or for cancellation signal.
doneCh := ctx.Done()
for !jm.checkIdle(ctx) {
select {
case <-doneCh:
ctx.Infof("Canceled")
jm.CancelAll(ctx)
// Note that we do not break out of the loop here, we expect runner to wind down and exit.
doneCh = nil
case <-time.After(50 * time.Millisecond):
}
}
// Refresh locks one last time for jobs that were paused.
jm.jobRunner.RefreshLocks()
return nil
}
func (jm *JobManager) failZombieJobs(ctx xcontext.Context, serverID string) error {
zombieJobs, err := jm.listMyJobs(ctx, serverID, job.JobStateStarted)
if err != nil {
return fmt.Errorf("failed to list zombie jobs: %w", err)
}
ctx.Infof("Found %d zombie jobs for %s/%s", len(zombieJobs), jm.config.instanceTag, serverID)
for _, jobID := range zombieJobs {
// Log a line with job id so there's something in the job log to tell what happened.
jobCtx := ctx.WithField("job_id", jobID)
jobCtx.Errorf("This became a zombie, most likely the previous server instance was killed ungracefully")
if err = jm.emitErrEvent(ctx, jobID, job.EventJobFailed, fmt.Errorf("Job %d was zombieed", jobID)); err != nil {
ctx.Errorf("Failed to emit event: %v", err)
}
}
return nil
}
func (jm *JobManager) listMyJobs(ctx xcontext.Context, serverID string, jobState job.State) ([]types.JobID, error) {
queryFields := []storage.JobQueryField{
storage.QueryJobServerID(serverID),
storage.QueryJobStates(jobState),
}
if jm.config.instanceTag != "" {
queryFields = append(queryFields, storage.QueryJobTags(jm.config.instanceTag))
}
q, err := storage.BuildJobQuery(queryFields...)
if err != nil {
return nil, fmt.Errorf("failed to build job query: %w", err)
}
jobs, err := jm.jsm.ListJobs(ctx, q)
if err != nil {
return nil, fmt.Errorf("failed to list jobs: %w", err)
}
return jobs, nil
}
func (jm *JobManager) checkIdle(ctx xcontext.Context) bool {
jm.jobsMu.Lock()
defer jm.jobsMu.Unlock()
if len(jm.jobs) == 0 {
return true
}
if jm.msgCounter%20 == 0 {
ctx.Infof("Waiting for %d jobs", len(jm.jobs))
}
jm.msgCounter++
return false
}
// CancelJob sends a cancellation request to a specific job.
func (jm *JobManager) CancelJob(jobID types.JobID) error {
jm.jobsMu.Lock()
defer jm.jobsMu.Unlock()
// get the job from the local cache rather than the storage layer. We can
// only cancel jobs that we are actively handling.
ji, ok := jm.jobs[jobID]
if !ok {
return fmt.Errorf("unknown job ID: %d", jobID)
}
ji.cancel()
return nil
}
// StopAPI stops accepting new requests.
func (jm *JobManager) StopAPI() {
jm.apiCancel()
}
// CancelAll cancels all running jobs.
func (jm *JobManager) CancelAll(ctx xcontext.Context) {
jm.jobsMu.Lock()
defer jm.jobsMu.Unlock()
for jobID, ji := range jm.jobs {
ctx.Debugf("JobManager: cancelling job %d", jobID)
ji.cancel()
}
}
// CancelAll pauses all running jobs.
func (jm *JobManager) PauseAll(ctx xcontext.Context) {
jm.jobsMu.Lock()
defer jm.jobsMu.Unlock()
for jobID, ji := range jm.jobs {
ctx.Debugf("JobManager: pausing job %d", jobID)
ji.pause()
}
}
func (jm *JobManager) emitEventPayload(ctx xcontext.Context, jobID types.JobID, eventName event.Name, payload interface{}) error {
var payloadJSON json.RawMessage
if payload != nil {
if p, err := json.Marshal(payload); err == nil {
payloadJSON = json.RawMessage(p)
} else {
return fmt.Errorf("Could not serialize payload for event %s: %v", eventName, err)
}
}
ev := frameworkevent.Event{
JobID: jobID,
EventName: eventName,
Payload: &payloadJSON,
EmitTime: time.Now(),
}
if err := jm.frameworkEvManager.Emit(ctx, ev); err != nil {
ctx.Warnf("Could not emit event %s for job %d: %v", eventName, jobID, err)
return err
}
return nil
}
func (jm *JobManager) emitErrEvent(ctx xcontext.Context, jobID types.JobID, eventName event.Name, err error) error {
if err != nil {
ctx.Errorf(err.Error())
return jm.emitEventPayload(ctx, jobID, eventName, &ErrorEventPayload{Err: *xjson.NewError(err)})
}
return jm.emitEventPayload(ctx, jobID, eventName, nil)
}
func (jm *JobManager) emitEvent(ctx xcontext.Context, jobID types.JobID, eventName event.Name) error {
return jm.emitErrEvent(ctx, jobID, eventName, nil)
}