network/gitlab.go (395 lines of code) (raw):
package network
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"github.com/Sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/common"
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers"
)
const clientError = -100
type GitLabClient struct {
clients map[string]*client
lock sync.Mutex
}
func (n *GitLabClient) getClient(credentials requestCredentials) (c *client, err error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.clients == nil {
n.clients = make(map[string]*client)
}
key := fmt.Sprintf("%s_%s_%s_%s", credentials.GetURL(), credentials.GetToken(), credentials.GetTLSCAFile(), credentials.GetTLSCertFile())
c = n.clients[key]
if c == nil {
c, err = newClient(credentials)
if err != nil {
return
}
n.clients[key] = c
}
return
}
func (n *GitLabClient) getLastUpdate(credentials requestCredentials) (lu string) {
cli, err := n.getClient(credentials)
if err != nil {
return ""
}
return cli.getLastUpdate()
}
func (n *GitLabClient) getRunnerVersion(config common.RunnerConfig) common.VersionInfo {
info := common.VersionInfo{
Name: common.NAME,
Version: common.VERSION,
Revision: common.REVISION,
Platform: runtime.GOOS,
Architecture: runtime.GOARCH,
Executor: config.Executor,
}
if executor := common.GetExecutor(config.Executor); executor != nil {
executor.GetFeatures(&info.Features)
}
if shell := common.GetShell(config.Shell); shell != nil {
shell.GetFeatures(&info.Features)
}
return info
}
func (n *GitLabClient) doRaw(credentials requestCredentials, method, uri string, request io.Reader, requestType string, headers http.Header) (res *http.Response, err error) {
c, err := n.getClient(credentials)
if err != nil {
return nil, err
}
return c.do(uri, method, request, requestType, headers)
}
func (n *GitLabClient) doJSON(credentials requestCredentials, method, uri string, statusCode int, request interface{}, response interface{}) (int, string, ResponseTLSData) {
c, err := n.getClient(credentials)
if err != nil {
return clientError, err.Error(), ResponseTLSData{}
}
return c.doJSON(uri, method, statusCode, request, response)
}
func (n *GitLabClient) RegisterRunner(runner common.RunnerCredentials, description, tags string, runUntagged, locked bool) *common.RegisterRunnerResponse {
// TODO: pass executor
request := common.RegisterRunnerRequest{
Token: runner.Token,
Description: description,
Info: n.getRunnerVersion(common.RunnerConfig{}),
Locked: locked,
RunUntagged: runUntagged,
Tags: tags,
}
var response common.RegisterRunnerResponse
result, statusText, _ := n.doJSON(&runner, "POST", "runners", http.StatusCreated, &request, &response)
switch result {
case http.StatusCreated:
runner.Log().Println("Registering runner...", "succeeded")
return &response
case http.StatusForbidden:
runner.Log().Errorln("Registering runner...", "forbidden (check registration token)")
return nil
case clientError:
runner.Log().WithField("status", statusText).Errorln("Registering runner...", "error")
return nil
default:
runner.Log().WithField("status", statusText).Errorln("Registering runner...", "failed")
return nil
}
}
func (n *GitLabClient) VerifyRunner(runner common.RunnerCredentials) bool {
request := common.VerifyRunnerRequest{
Token: runner.Token,
}
result, statusText, _ := n.doJSON(&runner, "POST", "runners/verify", http.StatusOK, &request, nil)
switch result {
case http.StatusOK:
// this is expected due to fact that we ask for non-existing job
runner.Log().Println("Verifying runner...", "is alive")
return true
case http.StatusForbidden:
runner.Log().Errorln("Verifying runner...", "is removed")
return false
case clientError:
runner.Log().WithField("status", statusText).Errorln("Verifying runner...", "error")
return true
default:
runner.Log().WithField("status", statusText).Errorln("Verifying runner...", "failed")
return true
}
}
func (n *GitLabClient) UnregisterRunner(runner common.RunnerCredentials) bool {
request := common.UnregisterRunnerRequest{
Token: runner.Token,
}
result, statusText, _ := n.doJSON(&runner, "DELETE", "runners", http.StatusNoContent, &request, nil)
const baseLogText = "Unregistering runner from GitLab"
switch result {
case http.StatusNoContent:
runner.Log().Println(baseLogText, "succeeded")
return true
case http.StatusForbidden:
runner.Log().Errorln(baseLogText, "forbidden")
return false
case clientError:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "error")
return false
default:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "failed")
return false
}
}
func addTLSData(response *common.JobResponse, tlsData ResponseTLSData) {
if tlsData.CAChain != "" {
response.TLSCAChain = tlsData.CAChain
}
if tlsData.CertFile != "" && tlsData.KeyFile != "" {
data, err := ioutil.ReadFile(tlsData.CertFile)
if err == nil {
response.TLSAuthCert = string(data)
}
data, err = ioutil.ReadFile(tlsData.KeyFile)
if err == nil {
response.TLSAuthKey = string(data)
}
}
}
func (n *GitLabClient) RequestJob(config common.RunnerConfig) (*common.JobResponse, bool) {
request := common.JobRequest{
Info: n.getRunnerVersion(config),
Token: config.Token,
LastUpdate: n.getLastUpdate(&config.RunnerCredentials),
}
var response common.JobResponse
result, statusText, tlsData := n.doJSON(&config.RunnerCredentials, "POST", "jobs/request", http.StatusCreated, &request, &response)
switch result {
case http.StatusCreated:
config.Log().WithFields(logrus.Fields{
"job": strconv.Itoa(response.ID),
"repo_url": response.RepoCleanURL(),
}).Println("Checking for jobs...", "received")
addTLSData(&response, tlsData)
return &response, true
case http.StatusForbidden:
config.Log().Errorln("Checking for jobs...", "forbidden")
return nil, false
case http.StatusNoContent:
config.Log().Debugln("Checking for jobs...", "nothing")
return nil, true
case clientError:
config.Log().WithField("status", statusText).Errorln("Checking for jobs...", "error")
return nil, false
default:
config.Log().WithField("status", statusText).Warningln("Checking for jobs...", "failed")
return nil, true
}
}
func (n *GitLabClient) UpdateJob(config common.RunnerConfig, jobCredentials *common.JobCredentials, id int, state common.JobState, trace *string) common.UpdateState {
request := common.UpdateJobRequest{
Info: n.getRunnerVersion(config),
Token: jobCredentials.Token,
State: state,
Trace: trace,
}
log := config.Log().WithField("job", id)
result, statusText, _ := n.doJSON(&config.RunnerCredentials, "PUT", fmt.Sprintf("jobs/%d", id), http.StatusOK, &request, nil)
switch result {
case http.StatusOK:
log.Debugln("Submitting job to coordinator...", "ok")
return common.UpdateSucceeded
case http.StatusNotFound:
log.Warningln("Submitting job to coordinator...", "aborted")
return common.UpdateAbort
case http.StatusForbidden:
log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "forbidden")
return common.UpdateAbort
case clientError:
log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "error")
return common.UpdateAbort
default:
log.WithField("status", statusText).Warningln("Submitting job to coordinator...", "failed")
return common.UpdateFailed
}
}
func (n *GitLabClient) PatchTrace(config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) common.UpdateState {
id := jobCredentials.ID
contentRange := fmt.Sprintf("%d-%d", tracePatch.Offset(), tracePatch.Limit())
headers := make(http.Header)
headers.Set("Content-Range", contentRange)
headers.Set("JOB-TOKEN", jobCredentials.Token)
uri := fmt.Sprintf("jobs/%d/trace", id)
request := bytes.NewReader(tracePatch.Patch())
response, err := n.doRaw(&config.RunnerCredentials, "PATCH", uri, request, "text/plain", headers)
if err != nil {
config.Log().Errorln("Appending trace to coordinator...", "error", err.Error())
return common.UpdateFailed
}
defer response.Body.Close()
defer io.Copy(ioutil.Discard, response.Body)
tracePatchResponse := NewTracePatchResponse(response)
log := config.Log().WithFields(logrus.Fields{
"job": id,
"sent-log": contentRange,
"job-log": tracePatchResponse.RemoteRange,
"job-status": tracePatchResponse.RemoteState,
"code": response.StatusCode,
"status": response.Status,
})
switch {
case tracePatchResponse.IsAborted():
log.Warningln("Appending trace to coordinator", "aborted")
return common.UpdateAbort
case response.StatusCode == http.StatusAccepted:
log.Debugln("Appending trace to coordinator...", "ok")
return common.UpdateSucceeded
case response.StatusCode == http.StatusNotFound:
log.Warningln("Appending trace to coordinator...", "not-found")
return common.UpdateNotFound
case response.StatusCode == http.StatusRequestedRangeNotSatisfiable:
log.Warningln("Appending trace to coordinator...", "range mismatch")
tracePatch.SetNewOffset(tracePatchResponse.NewOffset())
return common.UpdateRangeMismatch
case response.StatusCode == clientError:
log.Errorln("Appending trace to coordinator...", "error")
return common.UpdateAbort
default:
log.Warningln("Appending trace to coordinator...", "failed")
return common.UpdateFailed
}
}
func (n *GitLabClient) createArtifactsForm(mpw *multipart.Writer, reader io.Reader, baseName string) error {
wr, err := mpw.CreateFormFile("file", baseName)
if err != nil {
return err
}
_, err = io.Copy(wr, reader)
if err != nil {
return err
}
return nil
}
func (n *GitLabClient) UploadRawArtifacts(config common.JobCredentials, reader io.Reader, baseName string, expireIn string) common.UploadState {
pr, pw := io.Pipe()
defer pr.Close()
mpw := multipart.NewWriter(pw)
go func() {
defer pw.Close()
defer mpw.Close()
err := n.createArtifactsForm(mpw, reader, baseName)
if err != nil {
pw.CloseWithError(err)
}
}()
query := url.Values{}
if expireIn != "" {
query.Set("expire_in", expireIn)
}
headers := make(http.Header)
headers.Set("JOB-TOKEN", config.Token)
res, err := n.doRaw(&config, "POST", fmt.Sprintf("jobs/%d/artifacts?%s", config.ID, query.Encode()), pr, mpw.FormDataContentType(), headers)
log := logrus.WithFields(logrus.Fields{
"id": config.ID,
"token": helpers.ShortenToken(config.Token),
})
if res != nil {
log = log.WithField("responseStatus", res.Status)
}
if err != nil {
log.WithError(err).Errorln("Uploading artifacts to coordinator...", "error")
return common.UploadFailed
}
defer res.Body.Close()
defer io.Copy(ioutil.Discard, res.Body)
switch res.StatusCode {
case http.StatusCreated:
log.Println("Uploading artifacts to coordinator...", "ok")
return common.UploadSucceeded
case http.StatusForbidden:
log.WithField("status", res.Status).Errorln("Uploading artifacts to coordinator...", "forbidden")
return common.UploadForbidden
case http.StatusRequestEntityTooLarge:
log.WithField("status", res.Status).Errorln("Uploading artifacts to coordinator...", "too large archive")
return common.UploadTooLarge
default:
log.WithField("status", res.Status).Warningln("Uploading artifacts to coordinator...", "failed")
return common.UploadFailed
}
}
func (n *GitLabClient) UploadArtifacts(config common.JobCredentials, artifactsFile string) common.UploadState {
log := logrus.WithFields(logrus.Fields{
"id": config.ID,
"token": helpers.ShortenToken(config.Token),
})
file, err := os.Open(artifactsFile)
if err != nil {
log.WithError(err).Errorln("Uploading artifacts to coordinator...", "error")
return common.UploadFailed
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
log.WithError(err).Errorln("Uploading artifacts to coordinator...", "error")
return common.UploadFailed
}
if fi.IsDir() {
log.WithField("error", "cannot upload directories").Errorln("Uploading artifacts to coordinator...", "error")
return common.UploadFailed
}
baseName := filepath.Base(artifactsFile)
return n.UploadRawArtifacts(config, file, baseName, "")
}
func (n *GitLabClient) DownloadArtifacts(config common.JobCredentials, artifactsFile string) common.DownloadState {
headers := make(http.Header)
headers.Set("JOB-TOKEN", config.Token)
res, err := n.doRaw(&config, "GET", fmt.Sprintf("jobs/%d/artifacts", config.ID), nil, "", headers)
log := logrus.WithFields(logrus.Fields{
"id": config.ID,
"token": helpers.ShortenToken(config.Token),
})
if res != nil {
log = log.WithField("responseStatus", res.Status)
}
if err != nil {
log.Errorln("Downloading artifacts from coordinator...", "error", err.Error())
return common.DownloadFailed
}
defer res.Body.Close()
defer io.Copy(ioutil.Discard, res.Body)
switch res.StatusCode {
case http.StatusOK:
file, err := os.Create(artifactsFile)
if err == nil {
defer file.Close()
_, err = io.Copy(file, res.Body)
}
if err != nil {
file.Close()
os.Remove(file.Name())
log.WithError(err).Errorln("Downloading artifacts from coordinator...", "error")
return common.DownloadFailed
}
log.Println("Downloading artifacts from coordinator...", "ok")
return common.DownloadSucceeded
case http.StatusForbidden:
log.WithField("status", res.Status).Errorln("Downloading artifacts from coordinator...", "forbidden")
return common.DownloadForbidden
case http.StatusNotFound:
log.Errorln("Downloading artifacts from coordinator...", "not found")
return common.DownloadNotFound
default:
log.WithField("status", res.Status).Warningln("Downloading artifacts from coordinator...", "failed")
return common.DownloadFailed
}
}
func (n *GitLabClient) ProcessJob(config common.RunnerConfig, jobCredentials *common.JobCredentials) common.JobTrace {
trace := newJobTrace(n, config, jobCredentials)
trace.start()
return trace
}
func NewGitLabClient() *GitLabClient {
return &GitLabClient{}
}