pkg/transport/http/http.go (170 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package http import ( "context" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "net/url" "os" "strconv" "strings" "github.com/facebookincubator/contest/pkg/api" "github.com/facebookincubator/contest/pkg/job" "github.com/facebookincubator/contest/pkg/types" "github.com/facebookincubator/contest/plugins/listeners/httplistener" "github.com/insomniacslk/xjson" ) // HttpPartiallyDecodedResponse is a httplistener.HTTPAPIResponse, but with the Data not fully decoded yet type HTTPPartiallyDecodedResponse struct { ServerID string Type string Data json.RawMessage Error *xjson.Error } // HTTP communicates with ConTest Server via http(s)/json transport // HTTP implements the Transport interface type HTTP struct { Addr string } func (h *HTTP) Version(ctx context.Context, requestor string) (*api.VersionResponse, error) { resp, err := h.request(requestor, "version", url.Values{}) if err != nil { return nil, err } data := api.ResponseDataVersion{} if string(resp.Data) != "" { if err := json.Unmarshal([]byte(resp.Data), &data); err != nil { return nil, fmt.Errorf("cannot decode json response: %v", err) } } return &api.VersionResponse{ServerID: resp.ServerID, Data: data, Err: resp.Error}, nil } func (h *HTTP) Start(ctx context.Context, requestor string, jobDescriptor string) (*api.StartResponse, error) { params := url.Values{} params.Add("jobDesc", jobDescriptor) resp, err := h.request(requestor, "start", params) if err != nil { return nil, err } data := api.ResponseDataStart{} if string(resp.Data) != "" { if err := json.Unmarshal([]byte(resp.Data), &data); err != nil { return nil, fmt.Errorf("cannot decode json response: %v", err) } } return &api.StartResponse{ServerID: resp.ServerID, Data: data, Err: resp.Error}, nil } func (h *HTTP) Stop(ctx context.Context, requestor string, jobID types.JobID) (*api.StopResponse, error) { params := url.Values{} params.Add("jobID", strconv.Itoa(int(jobID))) resp, err := h.request(requestor, "stop", params) if err != nil { return nil, err } data := api.ResponseDataStop{} if string(resp.Data) != "" { if err := json.Unmarshal([]byte(resp.Data), &data); err != nil { return nil, fmt.Errorf("cannot decode json response: %v", err) } } return &api.StopResponse{ServerID: resp.ServerID, Data: data, Err: resp.Error}, nil } func (h *HTTP) Status(ctx context.Context, requestor string, jobID types.JobID) (*api.StatusResponse, error) { params := url.Values{} params.Add("jobID", strconv.Itoa(int(jobID))) resp, err := h.request(requestor, "status", params) if err != nil { return nil, err } data := api.ResponseDataStatus{} if string(resp.Data) != "" { if err := json.Unmarshal([]byte(resp.Data), &data); err != nil { return nil, fmt.Errorf("cannot decode json response: %v", err) } } return &api.StatusResponse{ServerID: resp.ServerID, Data: data, Err: resp.Error}, nil } func (h *HTTP) Retry(ctx context.Context, requestor string, jobID types.JobID) (*api.RetryResponse, error) { params := url.Values{} params.Add("jobID", strconv.Itoa(int(jobID))) resp, err := h.request(requestor, "retry", params) if err != nil { return nil, err } data := api.ResponseDataRetry{} if string(resp.Data) != "" { if err := json.Unmarshal([]byte(resp.Data), &data); err != nil { return nil, fmt.Errorf("cannot decode json response: %v", err) } } return &api.RetryResponse{ServerID: resp.ServerID, Data: data, Err: resp.Error}, nil } func (h *HTTP) List(ctx context.Context, requestor string, states []job.State, tags []string) (*api.ListResponse, error) { params := url.Values{} if len(states) > 0 { sts := make([]string, len(states)) for i, st := range states { sts[i] = st.String() } params.Set("states", strings.Join(sts, ",")) } if len(tags) > 0 { params.Set("tags", strings.Join(tags, ",")) } resp, err := h.request(requestor, "list", params) if err != nil { return nil, err } var data api.ResponseDataList if string(resp.Data) != "" { if err := json.Unmarshal([]byte(resp.Data), &data); err != nil { return nil, fmt.Errorf("cannot decode json response: %v", err) } } return &api.ListResponse{ServerID: resp.ServerID, Data: data, Err: resp.Error}, nil } func (h *HTTP) request(requestor string, verb string, params url.Values) (*HTTPPartiallyDecodedResponse, error) { params.Set("requestor", requestor) u, err := url.Parse(h.Addr) if err != nil { return nil, fmt.Errorf("failed to parse server address '%s': %v", h.Addr, err) } if u.Scheme == "" { return nil, errors.New("server URL scheme not specified") } if u.Scheme != "http" && u.Scheme != "https" { return nil, fmt.Errorf("unsupported URL scheme '%s', please specify either http or https", u.Scheme) } u.Path += "/" + verb fmt.Fprintf(os.Stderr, "Requesting URL %s with requestor ID '%s'\n", u.String(), requestor) fmt.Fprintf(os.Stderr, " with params:\n") for k, v := range params { fmt.Fprintf(os.Stderr, " %s: %s\n", k, v) } fmt.Fprintf(os.Stderr, "\n") resp, err := http.PostForm(u.String(), params) if err != nil { return nil, fmt.Errorf("HTTP POST failed: %v", err) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("cannot read HTTP response: %v", err) } fmt.Fprintf(os.Stderr, "The server responded with status %s\n", resp.Status) var apiResp HTTPPartiallyDecodedResponse if resp.StatusCode == http.StatusOK { // the Data field of apiResp will result in a map[string]interface{} if err := json.Unmarshal(body, &apiResp); err != nil { return nil, fmt.Errorf("response is not a valid HTTP API response object: '%s': %v", body, err) } if err != nil { return nil, fmt.Errorf("cannot marshal HTTPAPIResponse: %v", err) } } else { var apiErr httplistener.HTTPAPIError if err := json.Unmarshal(body, &apiErr); err != nil { return nil, fmt.Errorf("response is not a valid HTTP API Error object: '%s': %v", body, err) } apiResp.Error = xjson.NewError(errors.New(apiErr.Msg)) } return &apiResp, nil }