network/gitlab.go (968 lines of code) (raw):
package network
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/featureflags"
)
const (
// createdRunnerTokenPrefix is the token prefix used for GitLab UI-created runner authentication tokens
createdRunnerTokenPrefix = "glrt-"
clientError = -100
retryAfterHeader = "Retry-After"
responseBodyPeekMax = 512
)
func TokenIsCreatedRunnerToken(token string) bool {
return strings.HasPrefix(token, createdRunnerTokenPrefix)
}
type GitLabClient struct {
clients map[string]*client
lock sync.Mutex
apiRequestsCollector *APIRequestsCollector
connectionMaxAge time.Duration
}
func (n *GitLabClient) getClient(credentials requestCredentials) (c *client, err error) {
n.lock.Lock()
defer n.lock.Unlock()
if n.clients == nil {
n.clients = make(map[string]*client)
}
key := fmt.Sprintf(
"%s_%s_%s_%s",
credentials.GetURL(),
credentials.GetToken(),
credentials.GetTLSCAFile(),
credentials.GetTLSCertFile(),
)
c = n.clients[key]
if c == nil {
c, err = newClient(
credentials,
WithMaxAge(n.connectionMaxAge))
if err != nil {
return
}
n.clients[key] = c
}
return
}
func (n *GitLabClient) getLastUpdate(credentials requestCredentials) (lu string) {
cli, err := n.getClient(credentials)
if err != nil {
return ""
}
return cli.getLastUpdate()
}
// getFeatures enables features that are properties of networking client
func (n *GitLabClient) getFeatures(features *common.FeaturesInfo) {
features.TraceReset = true
features.TraceChecksum = true
features.TraceSize = true
features.Cancelable = true
features.CancelGracefully = true
}
func (n *GitLabClient) ExecutorSupportsNativeSteps(config common.RunnerConfig) bool {
return n.getRunnerVersion(config).Features.NativeStepsIntegration
}
func (n *GitLabClient) getRunnerVersion(config common.RunnerConfig) common.VersionInfo {
info := common.VersionInfo{
Name: common.AppVersion.Name,
Version: common.AppVersion.Version,
Revision: common.AppVersion.Revision,
Platform: runtime.GOOS,
Architecture: runtime.GOARCH,
Executor: config.Executor,
Shell: config.Shell,
}
n.getFeatures(&info.Features)
if executorProvider := common.GetExecutorProvider(config.Executor); executorProvider != nil {
_ = executorProvider.GetFeatures(&info.Features)
if info.Shell == "" {
info.Shell = executorProvider.GetDefaultShell()
}
executorProvider.GetConfigInfo(&config, &info.Config)
}
if shell := common.GetShell(info.Shell); shell != nil {
shell.GetFeatures(&info.Features)
}
return info
}
type doRawParams struct {
credentials requestCredentials
method string
uri string
request common.ContentProvider
requestType string
headers http.Header
}
// doMeasuredRaw is a decorator that adds metrics measurements through
// n.apiRequestsCollector to the doRaw() call
func (n *GitLabClient) doMeasuredRaw(
ctx context.Context,
log logrus.FieldLogger,
runnerID string,
systemID string,
endpoint apiEndpoint,
params doRawParams,
) (*http.Response, error) {
var response *http.Response
var err error
fn := func() int {
// Response body is handled after doMeasuredJSON() decorator call
// Linting violation here is a false-positive.
// nolint:bodyclose
response, err = n.doRaw(
ctx,
params.credentials,
params.method,
params.uri,
params.request,
params.requestType,
params.headers,
)
if err != nil {
return clientError
}
return response.StatusCode
}
n.apiRequestsCollector.Observe(
log,
runnerID,
systemID,
endpoint,
fn,
)
return response, err
}
func (n *GitLabClient) doRaw(
ctx context.Context,
credentials requestCredentials,
method, uri string,
bodyProvider common.ContentProvider,
requestType string,
headers http.Header,
) (res *http.Response, err error) {
c, err := n.getClient(credentials)
if err != nil {
return nil, err
}
return c.do(ctx, uri, method, bodyProvider, requestType, headers)
}
type doJSONParams struct {
credentials requestCredentials
method string
uri string
statusCode int
headers http.Header
request interface{}
response interface{}
}
// doMeasuredJSON is a decorator that adds metrics measurements through
// n.apiRequestsCollector to the doJSON() call
func (n *GitLabClient) doMeasuredJSON(
ctx context.Context,
log logrus.FieldLogger,
runnerID string,
systemID string,
endpoint apiEndpoint,
params doJSONParams,
) (int, string, *http.Response) {
var result int
var statusText string
var httpResponse *http.Response
fn := func() int {
// Response body is handled after doMeasuredJSON() decorator call
// Linting violation here is a false-positive.
// nolint:bodyclose
result, statusText, httpResponse = n.doJSON(
ctx,
params.credentials,
params.method,
params.uri,
params.statusCode,
params.headers,
params.request,
params.response,
)
return result
}
n.apiRequestsCollector.Observe(
log,
runnerID,
systemID,
endpoint,
fn,
)
return result, statusText, httpResponse
}
// Create a PRIVATE-TOKEN http header for the specified private access token (pat).
func PrivateTokenHeader(pat string) http.Header {
headers := http.Header{}
if pat != "" {
headers.Set(common.PrivateToken, pat)
}
return headers
}
// Create a JOB-TOKEN http header for the specified job token.
func JobTokenHeader(jobToken string) http.Header {
headers := http.Header{}
if jobToken != "" {
headers.Set(common.JobToken, jobToken)
}
return headers
}
// Create a RUNNER-TOKEN http header for the specified job token.
func RunnerTokenHeader(runnerToken string) http.Header {
headers := http.Header{}
if runnerToken != "" {
headers.Set(common.RunnerToken, runnerToken)
}
return headers
}
func (n *GitLabClient) doJSON(
ctx context.Context,
credentials requestCredentials,
method, uri string,
statusCode int,
headers http.Header,
request interface{},
response interface{},
) (int, string, *http.Response) {
c, err := n.getClient(credentials)
if err != nil {
return clientError, err.Error(), nil
}
return c.doJSON(ctx, uri, method, statusCode, headers, request, response)
}
func (n *GitLabClient) getResponseTLSData(
credentials requestCredentials,
resolveFullChain bool,
response *http.Response,
) (ResponseTLSData, error) {
c, err := n.getClient(credentials)
if err != nil {
return ResponseTLSData{}, fmt.Errorf("couldn't get client: %w", err)
}
return c.getResponseTLSData(response.TLS, resolveFullChain)
}
func (n *GitLabClient) SetConnectionMaxAge(age time.Duration) {
n.connectionMaxAge = age
}
func (n *GitLabClient) RegisterRunner(
runner common.RunnerCredentials,
parameters common.RegisterRunnerParameters,
) *common.RegisterRunnerResponse {
// TODO: pass executor
request := common.RegisterRunnerRequest{
RegisterRunnerParameters: parameters,
Token: runner.Token,
Info: n.getRunnerVersion(common.RunnerConfig{}),
}
var response common.RegisterRunnerResponse
result, statusText, resp := n.doJSON(
context.Background(),
&runner,
http.MethodPost,
"runners",
http.StatusCreated,
RunnerTokenHeader(runner.Token),
&request,
&response,
)
defer func() { n.handleResponse(context.TODO(), resp, false) }()
switch result {
case http.StatusCreated:
runner.Log().Println("Registering runner...", "succeeded")
return &response
case http.StatusForbidden:
runner.Log().WithField("status", statusText).Errorln("Registering runner...", "forbidden (check registration token)")
return nil
case clientError:
runner.Log().WithField("status", statusText).Errorln("Registering runner...", "client error")
return nil
default:
runner.Log().WithField("status", statusText).Errorln("Registering runner...", "failed")
return nil
}
}
func (n *GitLabClient) VerifyRunner(runner common.RunnerCredentials, systemID string) *common.VerifyRunnerResponse {
request := common.VerifyRunnerRequest{
Token: runner.Token,
SystemID: systemID,
}
var response common.VerifyRunnerResponse
result, statusText, resp := n.doJSON(
context.Background(),
&runner,
http.MethodPost,
"runners/verify",
http.StatusOK,
RunnerTokenHeader(runner.Token),
&request,
&response,
)
if result == -1 {
// if server is not able to return JSON, let's try plain text (the legacy response format)
result, statusText, resp = n.doJSON(
context.Background(),
&runner,
http.MethodPost,
"runners/verify",
http.StatusOK,
RunnerTokenHeader(runner.Token),
&request,
nil,
)
}
defer func() { n.handleResponse(context.TODO(), resp, false) }()
switch result {
case http.StatusOK:
// this is expected due to fact that we ask for non-existing job
if TokenIsCreatedRunnerToken(runner.Token) {
runner.Log().Println("Verifying runner...", "is valid")
} else {
runner.Log().Println("Verifying runner...", "is alive")
}
return &response
case http.StatusForbidden:
if TokenIsCreatedRunnerToken(runner.Token) {
runner.Log().Println("Verifying runner...", "is not valid")
} else {
runner.Log().WithField("status", statusText).Errorln("Verifying runner...", "is removed")
}
return nil
case clientError:
runner.Log().WithField("status", statusText).Errorln("Verifying runner...", "client error")
return &response
default:
runner.Log().WithField("status", statusText).Errorln("Verifying runner...", "failed")
return &response
}
}
func (n *GitLabClient) UnregisterRunner(runner common.RunnerCredentials) bool {
request := common.UnregisterRunnerRequest{
Token: runner.Token,
}
result, statusText, resp := n.doJSON(
context.Background(),
&runner,
http.MethodDelete,
"runners",
http.StatusNoContent,
RunnerTokenHeader(runner.Token),
&request,
nil,
)
defer func() { n.handleResponse(context.TODO(), resp, false) }()
const baseLogText = "Unregistering runner from GitLab"
switch result {
case http.StatusNoContent:
runner.Log().Println(baseLogText, "succeeded")
return true
case http.StatusForbidden:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "forbidden")
return false
case clientError:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "client error")
return false
default:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "failed")
return false
}
}
func (n *GitLabClient) UnregisterRunnerManager(runner common.RunnerCredentials, systemID string) bool {
request := common.UnregisterRunnerManagerRequest{
Token: runner.Token,
SystemID: systemID,
}
result, statusText, resp := n.doJSON(
context.Background(),
&runner,
http.MethodDelete,
"runners/managers",
http.StatusNoContent,
RunnerTokenHeader(runner.Token),
&request,
nil,
)
defer func() { n.handleResponse(context.TODO(), resp, false) }()
const baseLogText = "Unregistering runner manager from GitLab"
switch result {
case http.StatusNoContent:
runner.Log().Println(baseLogText, "succeeded")
return true
case http.StatusForbidden:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "forbidden")
return false
case clientError:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "client error")
return false
default:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "failed")
return false
}
}
func (n *GitLabClient) ResetToken(runner common.RunnerCredentials, systemID string) *common.ResetTokenResponse {
return n.resetToken(runner, systemID, "runners/reset_authentication_token", "")
}
func (n *GitLabClient) ResetTokenWithPAT(
runner common.RunnerCredentials,
systemID string,
pat string,
) *common.ResetTokenResponse {
return n.resetToken(runner, systemID, fmt.Sprintf("runners/%d/reset_authentication_token", runner.ID), pat)
}
func (n *GitLabClient) resetToken(
runner common.RunnerCredentials,
systemID string,
uri string,
pat string,
) *common.ResetTokenResponse {
var request *common.ResetTokenRequest
if pat == "" {
request = &common.ResetTokenRequest{
Token: runner.Token,
}
}
var response common.ResetTokenResponse
result, statusText, resp := n.doMeasuredJSON(
context.Background(),
runner.Log(),
runner.ShortDescription(),
systemID,
apiEndpointResetToken,
doJSONParams{
credentials: &runner,
method: http.MethodPost,
uri: uri,
statusCode: http.StatusCreated,
headers: PrivateTokenHeader(pat),
request: request,
response: &response,
},
)
defer func() { n.handleResponse(context.TODO(), resp, false) }()
const baseLogText = "Resetting runner authentication token..."
switch result {
case http.StatusCreated:
runner.Log().Println(baseLogText, "succeeded")
response.TokenObtainedAt = time.Now().UTC()
return &response
case http.StatusForbidden:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "failed (check used token)")
return nil
case clientError:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "client error")
return nil
default:
runner.Log().WithField("status", statusText).Errorln(baseLogText, "failed")
return nil
}
}
func addTLSData(response *common.JobResponse, tlsData ResponseTLSData) {
if tlsData.CAChain != "" {
response.TLSCAChain = tlsData.CAChain
}
if tlsData.CertFile != "" && tlsData.KeyFile != "" {
data, err := os.ReadFile(tlsData.CertFile)
if err == nil {
response.TLSAuthCert = string(data)
}
data, err = os.ReadFile(tlsData.KeyFile)
if err == nil {
response.TLSAuthKey = string(data)
}
}
}
func (n *GitLabClient) RequestJob(
ctx context.Context,
config common.RunnerConfig,
sessionInfo *common.SessionInfo,
) (*common.JobResponse, bool) {
request := common.JobRequest{
Info: n.getRunnerVersion(config),
Token: config.Token,
SystemID: config.SystemIDState.GetSystemID(),
LastUpdate: n.getLastUpdate(&config.RunnerCredentials),
Session: sessionInfo,
}
var response common.JobResponse
//nolint:bodyclose
result, statusText, httpResponse := n.doMeasuredJSON(
ctx,
config.Log(),
config.RunnerCredentials.ShortDescription(),
config.SystemIDState.GetSystemID(),
apiEndpointRequestJob,
doJSONParams{
credentials: &config.RunnerCredentials,
method: http.MethodPost,
uri: "jobs/request",
statusCode: http.StatusCreated,
headers: RunnerTokenHeader(config.Token),
request: &request, response: &response,
},
)
defer func() { n.handleResponse(ctx, httpResponse, false) }()
switch result {
case http.StatusCreated:
config.Log().WithFields(logrus.Fields{
"job": response.ID,
"repo_url": response.RepoCleanURL(),
}).Println("Checking for jobs...", "received")
resolveFullChain := config.IsFeatureFlagOn(featureflags.ResolveFullTLSChain)
tlsData, err := n.getResponseTLSData(&config.RunnerCredentials, resolveFullChain, httpResponse)
if err != nil {
config.Log().
WithError(err).Errorln("Error on fetching TLS Data from API response...", "error")
}
addTLSData(&response, tlsData)
return &response, true
case http.StatusForbidden:
config.Log().WithField("status", statusText).Errorln("Checking for jobs...", "forbidden")
return nil, false
case http.StatusNoContent:
config.Log().WithField("status", statusText).Debug("Checking for jobs...", "no content")
return nil, true
case clientError:
config.Log().WithField("status", statusText).Errorln("Checking for jobs...", "client error")
return nil, false
default:
config.Log().WithField("status", statusText).Warningln("Checking for jobs...", "failed")
return nil, true
}
}
func (n *GitLabClient) UpdateJob(
config common.RunnerConfig,
jobCredentials *common.JobCredentials,
jobInfo common.UpdateJobInfo,
) common.UpdateJobResult {
request := common.UpdateJobRequest{
Info: n.getRunnerVersion(config),
Token: jobCredentials.Token,
State: jobInfo.State,
FailureReason: jobInfo.FailureReason,
Checksum: jobInfo.Output.Checksum, // deprecated
Output: jobInfo.Output,
ExitCode: jobInfo.ExitCode,
}
log := config.Log().
WithField("job", jobInfo.ID).
WithField("checksum", request.Output.Checksum).
WithField("bytesize", request.Output.Bytesize)
log.Info("Updating job...")
//nolint:bodyclose
statusCode, statusText, response := n.doMeasuredJSON(
context.Background(),
config.Log(),
config.RunnerCredentials.ShortDescription(),
config.SystemIDState.GetSystemID(),
apiEndpointUpdateJob,
doJSONParams{
credentials: &config.RunnerCredentials,
method: http.MethodPut,
uri: fmt.Sprintf("jobs/%d", jobInfo.ID),
statusCode: http.StatusOK,
headers: JobTokenHeader(jobCredentials.Token),
request: &request,
response: nil,
},
)
return n.createUpdateJobResult(log, statusCode, statusText, response)
}
func (n *GitLabClient) createUpdateJobResult(
log *logrus.Entry,
statusCode int,
statusText string,
response *http.Response,
) common.UpdateJobResult {
defer func() { n.handleResponse(context.TODO(), response, false) }()
remoteJobStateResponse := NewRemoteJobStateResponse(response, log)
result := common.UpdateJobResult{
NewUpdateInterval: remoteJobStateResponse.RemoteUpdateInterval,
CancelRequested: remoteJobStateResponse.IsCanceled(),
}
log = log.WithFields(logrus.Fields{
"code": statusCode,
"job-status": remoteJobStateResponse.RemoteState,
"update-interval": remoteJobStateResponse.RemoteUpdateInterval,
})
switch {
case remoteJobStateResponse.IsFailed():
log.WithField("status", statusText).Warningln("Submitting job to coordinator...", "job failed")
result.State = common.UpdateAbort
case statusCode == http.StatusOK:
log.Info("Submitting job to coordinator...", "ok")
result.State = common.UpdateSucceeded
case statusCode == http.StatusAccepted:
log.Info("Submitting job to coordinator...", "accepted, but not yet completed")
result.State = common.UpdateAcceptedButNotCompleted
case statusCode == http.StatusPreconditionFailed:
log.Info("Submitting job to coordinator...", "trace validation failed")
result.State = common.UpdateTraceValidationFailed
case statusCode == http.StatusNotFound:
log.WithField("status", statusText).Warningln("Submitting job to coordinator...", "not found")
result.State = common.UpdateAbort
case statusCode == http.StatusForbidden:
log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "forbidden")
result.State = common.UpdateAbort
case statusCode == clientError:
log.WithField("status", statusText).Errorln("Submitting job to coordinator...", "client error")
result.State = common.UpdateAbort
default:
log.WithField("status", statusText).Warningln("Submitting job to coordinator...", "failed")
result.State = common.UpdateFailed
}
return result
}
func (n *GitLabClient) PatchTrace(
config common.RunnerConfig,
jobCredentials *common.JobCredentials,
content []byte,
startOffset int,
debugTraceEnabled bool,
) common.PatchTraceResult {
id := jobCredentials.ID
baseLog := config.Log().WithField("job", id)
if len(content) == 0 {
baseLog.Info("Appending trace to coordinator...", "skipped due to empty patch")
return common.NewPatchTraceResult(startOffset, common.PatchSucceeded, 0)
}
endOffset := startOffset + len(content)
contentRange := fmt.Sprintf("%d-%d", startOffset, endOffset-1)
headers := JobTokenHeader(jobCredentials.Token)
headers.Set("Content-Range", contentRange)
bodyProvider := common.BytesProvider{Data: content}
response, err := n.doMeasuredRaw(
context.Background(),
config.Log(),
config.RunnerCredentials.ShortDescription(),
config.SystemIDState.GetSystemID(),
apiEndpointPatchTrace,
doRawParams{
credentials: &config.RunnerCredentials,
method: "PATCH",
uri: fmt.Sprintf("jobs/%d/trace?%s", id, patchTraceQuery(debugTraceEnabled)),
request: bodyProvider,
requestType: "text/plain",
headers: headers,
},
)
if err != nil {
config.Log().Errorln("Appending trace to coordinator...", "error", err.Error())
return common.NewPatchTraceResult(startOffset, common.PatchFailed, 0)
}
defer func() { n.handleResponse(context.TODO(), response, true) }()
tracePatchResponse := NewTracePatchResponse(response, baseLog)
log := baseLog.WithFields(logrus.Fields{
"sent-log": contentRange,
"job-log": tracePatchResponse.RemoteRange,
"job-status": tracePatchResponse.RemoteState,
"code": response.StatusCode,
"status": response.Status,
"update-interval": tracePatchResponse.RemoteUpdateInterval,
})
return n.createPatchTraceResult(startOffset, tracePatchResponse, response, endOffset, log)
}
func patchTraceQuery(debugTraceEnabled bool) string {
query := url.Values{}
query.Set("debug_trace", strconv.FormatBool(debugTraceEnabled))
return query.Encode()
}
func (n *GitLabClient) createPatchTraceResult(
startOffset int,
tracePatchResponse *TracePatchResponse,
response *http.Response,
endOffset int,
log *logrus.Entry,
) common.PatchTraceResult {
result := common.PatchTraceResult{
SentOffset: startOffset,
NewUpdateInterval: tracePatchResponse.RemoteUpdateInterval,
CancelRequested: tracePatchResponse.IsCanceled(),
}
switch {
case tracePatchResponse.IsFailed():
log.Warningln("Appending trace to coordinator...", "job failed")
result.State = common.PatchAbort
return result
case response.StatusCode == http.StatusAccepted:
log.Info("Appending trace to coordinator...", "ok")
result.SentOffset = endOffset
result.State = common.PatchSucceeded
return result
case response.StatusCode == http.StatusNotFound:
log.Warningln("Appending trace to coordinator...", "not-found")
result.State = common.PatchNotFound
return result
case response.StatusCode == http.StatusRequestedRangeNotSatisfiable:
log.Warningln("Appending trace to coordinator...", "range mismatch")
result.SentOffset = tracePatchResponse.NewOffset()
result.State = common.PatchRangeMismatch
return result
case response.StatusCode == clientError:
log.Errorln("Appending trace to coordinator...", "client error")
result.State = common.PatchAbort
return result
default:
log.Warningln("Appending trace to coordinator...", "failed")
result.State = common.PatchFailed
return result
}
}
func (n *GitLabClient) createArtifactsContentProvider(originalContentProvider common.ContentProvider, baseName string) (common.ContentProvider, string) {
// Create an initial multipart writer with a buffer to get its boundary
var buf bytes.Buffer
mpw := multipart.NewWriter(&buf)
boundary := mpw.Boundary()
contentType := mpw.FormDataContentType()
mpw.Close()
// Return a body provider function that creates a new pipe each time
bodyProvider := common.StreamProvider{
ReaderFactory: func() (io.ReadCloser, error) {
// Get a fresh reader from the original provider
originalBody, err := originalContentProvider.GetReader()
if err != nil {
return nil, fmt.Errorf("couldn't get original body: %w", err)
}
pr, pw := io.Pipe()
mpw := multipart.NewWriter(pw)
// Use the same boundary to ensure consistent content type
err = mpw.SetBoundary(boundary)
if err != nil {
originalBody.Close()
pr.Close()
pw.Close()
return nil, fmt.Errorf("couldn't set form boundary: %w", err)
}
// Use goroutine to write to the pipe
go func() {
defer func() {
originalBody.Close()
mpw.Close()
pw.Close()
}()
wr, err := mpw.CreateFormFile("file", baseName)
if err != nil {
_ = pw.CloseWithError(fmt.Errorf("failed to create form file: %w", err))
return
}
// Copy from the fresh reader to the multipart form
_, err = io.Copy(wr, originalBody)
if err != nil {
_ = pw.CloseWithError(fmt.Errorf("failed to copy content to form: %w", err))
}
}()
return pr, nil
},
}
return bodyProvider, contentType
}
func uploadRawArtifactsQuery(options common.ArtifactsOptions) url.Values {
q := url.Values{}
if options.ExpireIn != "" {
q.Set("expire_in", options.ExpireIn)
}
if options.Format != "" {
q.Set("artifact_format", string(options.Format))
}
if options.Type != "" {
q.Set("artifact_type", options.Type)
}
return q
}
func (n *GitLabClient) UploadRawArtifacts(
config common.JobCredentials,
originalContentProvider common.ContentProvider,
options common.ArtifactsOptions,
) (common.UploadState, string) {
bodyProvider, contentType := n.createArtifactsContentProvider(originalContentProvider, options.BaseName)
query := uploadRawArtifactsQuery(options)
res, err := n.doRaw(
context.Background(),
&config,
http.MethodPost,
fmt.Sprintf("jobs/%d/artifacts?%s", config.ID, query.Encode()),
bodyProvider,
contentType,
JobTokenHeader(config.Token))
defer func() { n.handleResponse(context.TODO(), res, true) }()
log := logrus.WithFields(logrus.Fields{
"id": config.ID,
"token": helpers.ShortenToken(config.Token),
})
if options.LogResponseDetails {
logResponseDetails(log, res, true)
}
if res != nil {
log = log.WithField("responseStatus", res.Status)
}
messagePrefix := "Uploading artifacts to coordinator..."
if options.Type != "" {
messagePrefix = fmt.Sprintf("Uploading artifacts as %q to coordinator...", options.Type)
}
if err != nil {
log.WithError(err).Errorln(messagePrefix, "error")
return common.UploadFailed, ""
}
return n.determineUploadState(res, log, messagePrefix)
}
func logResponseDetails(logger *logrus.Entry, res *http.Response, withBody bool) {
if res == nil {
return
}
fields := logrus.Fields{"body": "<nil>"}
for k, vs := range res.Header {
fields["header["+k+"]"] = vs
}
if withBody && res.Body != nil {
body := bufio.NewReader(res.Body)
res.Body = struct {
io.Reader
io.Closer
}{body, res.Body}
// We ignore the error here, and let other body consumers handle it, if it persists.
b, _ := body.Peek(responseBodyPeekMax)
if res.ContentLength > int64(len(b)) {
b = append(b, "..."...)
}
fields["body"] = string(b)
}
logger.WithFields(fields).Warn("received response")
}
func closeWithLogging(log logrus.FieldLogger, c io.Closer, name string) {
err := c.Close()
if err != nil {
log.WithError(err).Warningf("Error while closing the %s", name)
}
}
func (n *GitLabClient) determineUploadState(
resp *http.Response,
log *logrus.Entry,
messagePrefix string,
) (common.UploadState, string) {
statusText := getMessageFromJSONResponse(resp)
switch resp.StatusCode {
case http.StatusCreated:
log.Println(messagePrefix, statusText)
return common.UploadSucceeded, ""
case http.StatusTemporaryRedirect:
return handleUploadRedirectionState(resp, log, messagePrefix, statusText)
case http.StatusForbidden:
log.WithField("status", resp.StatusCode).Errorln(messagePrefix, statusText)
return common.UploadForbidden, ""
case http.StatusRequestEntityTooLarge:
log.WithField("status", resp.StatusCode).Errorln(messagePrefix, statusText)
return common.UploadTooLarge, ""
case http.StatusServiceUnavailable:
log.WithField("status", resp.StatusCode).Errorln(messagePrefix, statusText)
return common.UploadServiceUnavailable, ""
default:
log.WithField("status", resp.StatusCode).Warningln(messagePrefix, statusText)
return common.UploadFailed, ""
}
}
func handleUploadRedirectionState(
resp *http.Response,
log *logrus.Entry,
messagePrefix string,
statusText string,
) (common.UploadState, string) {
location := resp.Header.Get("Location")
if location == "" {
log.WithField("status", resp.StatusCode).Errorln(messagePrefix, statusText, "empty location")
return common.UploadFailed, ""
}
return common.UploadRedirected, location
}
func (n *GitLabClient) DownloadArtifacts(
config common.JobCredentials,
artifactsFile io.WriteCloser,
directDownload *bool,
) common.DownloadState {
query := url.Values{}
if directDownload != nil {
query.Set("direct_download", strconv.FormatBool(*directDownload))
}
uri := fmt.Sprintf("jobs/%d/artifacts?%s", config.ID, query.Encode())
res, err := n.doRaw(
context.Background(),
&config,
http.MethodGet,
uri,
nil,
"",
JobTokenHeader(config.Token))
log := logrus.WithFields(logrus.Fields{
"id": config.ID,
"token": helpers.ShortenToken(config.Token),
})
if res != nil {
log = log.WithField("responseStatus", res.Status)
if res.Request != nil && res.Request.URL != nil {
log = log.WithField("host", res.Request.URL.Host)
}
}
if err != nil {
log.Errorln("Downloading artifacts from coordinator...", "error", err.Error())
return common.DownloadFailed
}
defer func() { n.handleResponse(context.TODO(), res, true) }()
switch res.StatusCode {
case http.StatusOK:
return n.downloadArtifactFile(log, artifactsFile, res)
case http.StatusForbidden:
// We generally expect JSON responses from the GitLab API, but a
// 302 redirection to object storage may result in an XML
// response that might include important details why the request
// was rejected (e.g. Google VPC Service Controls).
statusText := getMessageFromJSONOrXMLResponse(res)
log.WithField("status", statusText).Errorln("Downloading artifacts from coordinator...", "forbidden")
return common.DownloadForbidden
case http.StatusUnauthorized:
log.WithField("status", res.Status).Errorln("Downloading artifacts from coordinator...", "unauthorized")
return common.DownloadUnauthorized
case http.StatusNotFound:
log.Errorln("Downloading artifacts from coordinator...", "not found")
return common.DownloadNotFound
default:
log.WithField("status", res.Status).Warningln("Downloading artifacts from coordinator...", "failed")
return common.DownloadFailed
}
}
func (n *GitLabClient) downloadArtifactFile(
log logrus.FieldLogger,
file io.WriteCloser,
res *http.Response,
) common.DownloadState {
_, err := io.Copy(file, res.Body)
closeWithLogging(log, file, "file writer")
if err != nil {
log.WithError(err).Errorln("Downloading artifacts from coordinator...", "error")
return common.DownloadFailed
}
log.Println("Downloading artifacts from coordinator...", "ok")
return common.DownloadSucceeded
}
func (n *GitLabClient) ProcessJob(
config common.RunnerConfig,
jobCredentials *common.JobCredentials,
) (common.JobTrace, error) {
trace, err := newJobTrace(n, config, jobCredentials)
if err != nil {
return nil, err
}
trace.start()
return trace, nil
}
func (n *GitLabClient) handleResponse(ctx context.Context, res *http.Response, discardBody bool) {
if res == nil {
return
}
defer func() {
if discardBody {
_, _ = io.Copy(io.Discard, io.LimitReader(res.Body, 1025*1025))
}
_ = res.Body.Close()
}()
if res.StatusCode != http.StatusTooManyRequests {
return
}
if retryAfter, err := strconv.Atoi(res.Header.Get(retryAfterHeader)); err == nil {
select {
case <-ctx.Done():
case <-time.After(time.Duration(retryAfter) * time.Second):
}
}
}
func NewGitLabClientWithAPIRequestsCollector(c *APIRequestsCollector) *GitLabClient {
return &GitLabClient{
apiRequestsCollector: c,
}
}
func NewGitLabClient() *GitLabClient {
return NewGitLabClientWithAPIRequestsCollector(NewAPIRequestsCollector())
}