pkg/api/api.go (203 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package api
import (
"errors"
"fmt"
"os"
"time"
"github.com/facebookincubator/contest/pkg/storage"
"github.com/facebookincubator/contest/pkg/storage/limits"
"github.com/facebookincubator/contest/pkg/types"
"github.com/facebookincubator/contest/pkg/xcontext"
)
// CurrentAPIVersion is the current version of the API that the clients must be
// able to speak in order to communicate with the server. Versioning starts at
// 1, while 0 is to be considered an error indicator.
const CurrentAPIVersion uint32 = 5
// DefaultEventTimeout is the default time to wait for sending or receiving an
// event on the events channel.
var DefaultEventTimeout = 3 * time.Second
// ServerIDFunc is used to return a custom server ID in api responses.
type ServerIDFunc func() string
// The API structure implements the communication between clients and the
// JobManager. It enables several operations like starting, stopping,
// retrying a job, and getting a job status.
type API struct {
// Config is a set of knobs to change the behavior of API processing.
Config Config
// Events channel is used to route API events between clients and the
// JobManager. It is not necessary to close it explicitly as it will be
// garbage-collected when the API structure in the client goes out of scope.
Events chan *Event
// serverID is used by ServerID() to return a custom server ID in API
// responses.
serverID string
}
// New returns an initialized instance of an API struct with the specified
// server ID generation function.
func New(opts ...Option) (*API, error) {
cfg := getConfig(opts...)
serverID, err := obtainServerID(cfg.ServerIDFunc)
if err != nil {
return nil, fmt.Errorf("Cannot create API instance: %w", err)
}
return &API{
Config: cfg,
Events: make(chan *Event),
serverID: serverID,
}, nil
}
func obtainServerID(serverIDFunc func() string) (string, error) {
serverID := "<unknown>"
if serverIDFunc != nil {
serverID = serverIDFunc()
} else {
if hn, err := os.Hostname(); err == nil {
serverID = hn
}
}
if err := limits.NewValidator().ValidateServerID(serverID); err != nil {
return "", err
}
return serverID, nil
}
// ServerID returns the Server ID to be used in responses. A custom server ID
// generation function can be passed to New().
func (a API) ServerID() string {
return a.serverID
}
// newResponse returns a new Response object with type and server ID set. The
// Data field has to be set by the user.
func (a API) newResponse(rtype ResponseType) Response {
return Response{
Type: rtype,
ServerID: a.ServerID(),
}
}
// Version returns the version of the API. It's the client's responsibility
// to check whether it can talk the right API. If the client speaks an
// incompatible version of the API that the server doesn't understand, it's
// the server's responsibility to return an error upon API calls.
func (a API) Version() Response {
// NOTE: backward-compatibility should be handled by a proxy endpoint that
// speaks the same API, and will detect the version and redirect to the
// appropriate backend. This will simplify the way migrations are carried
// over.
resp := a.newResponse(ResponseTypeVersion)
resp.Data = ResponseDataVersion{
Version: CurrentAPIVersion,
}
return resp
}
// SendEvent sends an Event object on the event channel, without waiting for a
// reply. If the send doesn't complete within the timeout, an error is returned.
func (a *API) SendEvent(ev *Event, timeout *time.Duration) error {
if ev.Msg.Requestor() == "" {
return errors.New("requestor cannot be empty")
}
if err := limits.NewValidator().ValidateRequestorName(string(ev.Msg.Requestor())); err != nil {
return err
}
to := a.Config.EventTimeout
if timeout != nil {
to = *timeout
}
select {
case a.Events <- ev:
return nil
case <-time.After(to):
return fmt.Errorf("sending event timed out after %v", to)
}
}
// SendReceiveEvent sends an Event object on the event channel, and waits for a reply
// from the consumer. The timeout is used once for the send, and once for the
// receive, it's not a cumulative timeout.
func (a *API) SendReceiveEvent(ev *Event, timeout *time.Duration) (*EventResponse, error) {
to := a.Config.EventTimeout
if timeout != nil {
to = *timeout
}
// send
if err := a.SendEvent(ev, &to); err != nil {
return nil, err
}
// receive
var resp *EventResponse
select {
case resp = <-ev.RespCh:
return resp, nil
case <-time.After(to):
return nil, fmt.Errorf("time out waiting for response after %v", to)
}
}
// Start requests to create a new test job, as described by the job descriptor.
// A job descriptor may contain multiple tests, which will be run sequentially,
// not in parallel. If you need parallelism, you need to submit multiple
// independent jobs. If you need coordination across jobs, you need to write
// your own synchronization plugins that use external means (e.g. the events
// API), but no inter-job synchronization is implemented in the framework
// itself. This is intentional, to avoid overcomplicating the orchestration
// for a few edge cases.
// Each job descriptor must be JSON-encoded, and will be deserialized in a
// `contest.JobDescriptor` object by the JobManager.
// This method must return a unique job ID, that can be used for various
// operations via the API, e.g. getting the job status or stopping it.
// This method should return an error if the job description is malformed or
// invalid, and if the API version is incompatible.
func (a *API) Start(ctx xcontext.Context, requestor EventRequestor, jobDescriptor string) (Response, error) {
resp := a.newResponse(ResponseTypeStart)
ev := &Event{
// To allow jobs to finish we do not allow passing cancel and pause
// signals to the job's context (therefore: xcontext.WithResetSignalers).
Context: xcontext.WithResetSignalers(ctx).WithTag("api_method", "start"),
Type: EventTypeStart,
ServerID: resp.ServerID,
Msg: EventStartMsg{
requestor: requestor,
JobDescriptor: jobDescriptor,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataStart{
JobID: respEv.JobID,
}
resp.Err = respEv.Err
return resp, nil
}
// Stop requests a job cancellation by the given job ID.
func (a *API) Stop(ctx xcontext.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
resp := a.newResponse(ResponseTypeStop)
ev := &Event{
Context: ctx.WithTag("api_method", "stop"),
Type: EventTypeStop,
ServerID: resp.ServerID,
Msg: EventStopMsg{
requestor: requestor,
JobID: jobID,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataStop{}
resp.Err = respEv.Err
return resp, nil
}
// Status polls the status of a job by its ID, and returns a contest.Status
//object
func (a *API) Status(ctx xcontext.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
resp := a.newResponse(ResponseTypeStatus)
ev := &Event{
Context: ctx.WithTag("api_method", "status"),
Type: EventTypeStatus,
ServerID: resp.ServerID,
Msg: EventStatusMsg{
requestor: requestor,
JobID: jobID,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataStatus{
Status: respEv.Status,
}
resp.Err = respEv.Err
return resp, nil
}
// Retry will retry a job identified by its ID, using the same job
// description. If the job is still running, an error is returned.
func (a *API) Retry(ctx xcontext.Context, requestor EventRequestor, jobID types.JobID) (Response, error) {
resp := a.newResponse(ResponseTypeRetry)
ev := &Event{
Context: ctx.WithTag("api_method", "retry"),
Type: EventTypeRetry,
ServerID: resp.ServerID,
Msg: EventRetryMsg{
requestor: requestor,
JobID: jobID,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataRetry{
// this is the job ID of the job to retry, not the new job ID
JobID: jobID,
// TODO this should set the new Job ID
// NewJobID: ...
}
resp.Err = respEv.Err
return resp, nil
}
// List will list jobs matching the specified criteria.
func (a *API) List(ctx xcontext.Context, requestor EventRequestor, query *storage.JobQuery) (Response, error) {
resp := a.newResponse(ResponseTypeList)
ev := &Event{
Context: ctx.WithTag("api_method", "list"),
Type: EventTypeList,
ServerID: resp.ServerID,
Msg: EventListMsg{
requestor: requestor,
Query: query,
},
RespCh: make(chan *EventResponse, 1),
}
respEv, err := a.SendReceiveEvent(ev, nil)
if err != nil {
return resp, err
}
resp.Data = ResponseDataList{
JobIDs: respEv.JobIDs,
}
resp.Err = respEv.Err
return resp, nil
}