executors/docker/services.go (285 lines of code) (raw):
package docker
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/common/buildlogger"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/container/services"
service_helpers "gitlab.com/gitlab-org/gitlab-runner/helpers/service"
)
type tooManyServicesRequestedError struct {
requested int
allowed int
}
func (e *tooManyServicesRequestedError) Error() string {
return fmt.Sprintf("too many services requested: %d, only %d allowed", e.requested, e.allowed)
}
func (e *tooManyServicesRequestedError) Is(err error) bool {
var target *tooManyServicesRequestedError
if !errors.As(err, &target) {
return false
}
return e.allowed == target.allowed && e.requested == target.requested
}
func (e *executor) createServices() error {
e.SetCurrentStage(ExecutorStageCreatingServices)
e.BuildLogger.Debugln("Creating services...")
servicesDefinitions, err := e.getServicesDefinitions()
if err != nil {
return err
}
linksMap := make(map[string]*types.Container)
for index, serviceDefinition := range servicesDefinitions {
if err := e.createFromServiceDefinition(index, serviceDefinition, linksMap); err != nil {
return err
}
}
e.captureContainersLogs(e.Context, linksMap)
e.waitForServices()
if e.networkMode.UserDefined() != "" {
return nil
}
if e.networkMode.IsBridge() || e.networkMode.NetworkName() == "" {
e.BuildLogger.Debugln("Building service links...")
e.links = e.buildServiceLinks(linksMap)
}
return nil
}
func (e *executor) getServicesDefinitions() (common.Services, error) {
var internalServiceImages []string
serviceDefinitions := common.Services{}
for _, service := range e.Config.Docker.GetExpandedServices(e.Build.GetAllVariables()) {
internalServiceImages = append(internalServiceImages, service.Name)
serviceDefinitions = append(serviceDefinitions, service.ToImageDefinition())
}
for _, service := range e.Build.Services {
err := e.verifyAllowedImage(service.Name, "services", e.Config.Docker.AllowedServices, internalServiceImages)
if err != nil {
return nil, err
}
serviceDefinitions = append(serviceDefinitions, service)
}
servicesLimit := e.Config.Docker.GetServicesLimit()
if servicesLimit >= 0 && len(serviceDefinitions) > servicesLimit {
return nil, &tooManyServicesRequestedError{requested: len(serviceDefinitions), allowed: servicesLimit}
}
return serviceDefinitions, nil
}
func (e *executor) waitForServices() {
timeout := e.Config.Docker.WaitForServicesTimeout
if timeout == 0 {
timeout = common.DefaultWaitForServicesTimeout
}
// wait for all services to come up
if timeout > 0 && len(e.services) > 0 {
e.BuildLogger.Println("Waiting for services to be up and running (timeout", timeout, "seconds)...")
wg := sync.WaitGroup{}
for _, service := range e.services {
wg.Add(1)
go func(service *types.Container) {
_ = e.waitForServiceContainer(service, time.Duration(timeout)*time.Second)
wg.Done()
}(service)
}
wg.Wait()
}
}
func (e *executor) buildServiceLinks(linksMap map[string]*types.Container) (links []string) {
for linkName, linkee := range linksMap {
newContainer, err := e.client.ContainerInspect(e.Context, linkee.ID)
if err != nil {
continue
}
if newContainer.State.Running {
links = append(links, linkee.ID+":"+linkName)
}
}
return
}
func (e *executor) createFromServiceDefinition(
serviceIndex int,
serviceDefinition common.Image,
linksMap map[string]*types.Container,
) error {
var container *types.Container
serviceMeta := services.SplitNameAndVersion(serviceDefinition.Name)
if len(serviceDefinition.Aliases()) != 0 {
serviceMeta.Aliases = append(serviceMeta.Aliases, serviceDefinition.Aliases()...)
}
for _, linkName := range serviceMeta.Aliases {
if linksMap[linkName] != nil {
e.BuildLogger.Warningln("Service", serviceDefinition.Name, "is already created. Ignoring.")
continue
}
// Create service if not yet created
if container == nil {
var err error
container, err = e.createService(
serviceIndex,
serviceMeta.Service,
serviceMeta.Version,
serviceMeta.ImageName,
serviceDefinition,
serviceMeta.Aliases,
)
if err != nil {
return err
}
e.BuildLogger.Debugln("Created service", serviceDefinition.Name, "as", container.ID)
e.services = append(e.services, container)
e.temporary = append(e.temporary, container.ID)
}
linksMap[linkName] = container
}
return nil
}
type serviceHealthCheckError struct {
Inner error
Logs string
}
func (e *serviceHealthCheckError) Error() string {
if e.Inner == nil {
return "serviceHealthCheckError"
}
return e.Inner.Error()
}
func (e *executor) runServiceHealthCheckContainer(service *types.Container, timeout time.Duration) error {
waitImage, err := e.getHelperImage()
if err != nil {
return fmt.Errorf("getPrebuiltImage: %w", err)
}
containerName := service.Names[0] + "-wait-for-service"
environment, err := e.addServiceHealthCheckEnvironment(service)
if err != nil {
return err
}
cmd := []string{"gitlab-runner-helper", "health-check"}
config := e.createConfigForServiceHealthCheckContainer(service, cmd, waitImage, environment)
hostConfig := e.createHostConfigForServiceHealthCheck(service)
e.BuildLogger.Debugln(fmt.Sprintf("Creating service healthcheck container %s...", containerName))
resp, err := e.client.ContainerCreate(e.Context, config, hostConfig, nil, nil, containerName)
if err != nil {
return fmt.Errorf("create service container: %w", err)
}
defer func() { _ = e.removeContainer(e.Context, resp.ID) }()
e.BuildLogger.Debugln(fmt.Sprintf("Starting service healthcheck container %s (%s)...", containerName, resp.ID))
err = e.client.ContainerStart(e.Context, resp.ID, container.StartOptions{})
if err != nil {
return fmt.Errorf("start service container: %w", err)
}
ctx, cancel := context.WithTimeout(e.Context, timeout)
defer cancel()
err = e.waiter.Wait(ctx, resp.ID)
if err == nil {
return nil
}
if errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("service %q timeout", containerName)
} else {
err = fmt.Errorf("service %q health check: %w", containerName, err)
}
return &serviceHealthCheckError{
Inner: err,
Logs: e.readContainerLogs(resp.ID),
}
}
func (e *executor) createConfigForServiceHealthCheckContainer(
service *types.Container,
cmd []string,
waitImage *types.ImageInspect,
environment []string,
) *container.Config {
return &container.Config{
Cmd: cmd,
Image: waitImage.ID,
Labels: e.labeler.Labels(map[string]string{"type": labelWaitType, "wait": service.ID}),
Env: environment,
}
}
func (e *executor) waitForServiceContainer(service *types.Container, timeout time.Duration) error {
err := e.runServiceHealthCheckContainer(service, timeout)
if err == nil {
return nil
}
var buffer bytes.Buffer
buffer.WriteString("\n")
buffer.WriteString(
helpers.ANSI_YELLOW + "*** WARNING:" + helpers.ANSI_RESET + " Service " + service.Names[0] +
" probably didn't start properly.\n")
buffer.WriteString("\n")
buffer.WriteString("Health check error:\n")
buffer.WriteString(strings.TrimSpace(err.Error()))
buffer.WriteString("\n")
if healtCheckErr, ok := err.(*serviceHealthCheckError); ok {
buffer.WriteString("\n")
buffer.WriteString("Health check container logs:\n")
buffer.WriteString(healtCheckErr.Logs)
buffer.WriteString("\n")
}
buffer.WriteString("\n")
buffer.WriteString("Service container logs:\n")
buffer.WriteString(e.readContainerLogs(service.ID))
buffer.WriteString("\n")
buffer.WriteString("\n")
buffer.WriteString(helpers.ANSI_YELLOW + "*********" + helpers.ANSI_RESET + "\n")
buffer.WriteString("\n")
wc := e.BuildLogger.Stream(buildlogger.StreamExecutorLevel, buildlogger.Stderr)
defer wc.Close()
_, _ = wc.Write(buffer.Bytes())
return err
}
// captureContainersLogs initiates capturing logs for the specified containers
// to a desired additional sink. The sink can be any io.Writer. Currently the
// sink is the jobs main trace, which is wrapped in an inlineServiceLogWriter
// instance to add additional context to logs. In the future this could be
// separate file.
func (e *executor) captureContainersLogs(ctx context.Context, linksMap map[string]*types.Container) {
if !e.Build.IsCIDebugServiceEnabled() {
return
}
for _, service := range e.services {
aliases := []string{}
for alias, container := range linksMap {
if container == service {
aliases = append(aliases, alias)
}
}
logger := e.BuildLogger.Stream(buildlogger.StreamStartingServiceLevel, buildlogger.Stdout)
defer logger.Close()
sink := service_helpers.NewInlineServiceLogWriter(strings.Join(aliases, "-"), logger)
if err := e.captureContainerLogs(ctx, service.ID, service.Names[0], sink); err != nil {
e.BuildLogger.Warningln(err.Error())
}
logger.Close()
}
}
// captureContainerLogs tails (i.e. reads) logs emitted to stdout or stdin from
// processes in the specified container, and redirects them to the specified
// sink, which can be any io.Writer (e.g. this process's stdout, a file, a log
// aggregator). The logs are streamed as they are emitted, rather than batched
// and written when we disconnect from the container (or it is stopped). The
// specified sink is closed when the source is completely drained.
func (e *executor) captureContainerLogs(ctx context.Context, cid, containerName string, sink io.WriteCloser) error {
source, err := e.client.ContainerLogs(ctx, cid, container.LogsOptions{
ShowStderr: true,
ShowStdout: true,
Timestamps: true,
Follow: true,
})
if err != nil {
return fmt.Errorf("failed to open log stream for container %s: %w", containerName, err)
}
e.BuildLogger.Debugln("streaming logs for container " + containerName)
go func() {
defer source.Close()
defer sink.Close()
// Using stdcopy assumes service containers are run with TTY=false. If
// containers are started with TTY=true, io.Copy should be used instead.
if _, err := stdcopy.StdCopy(sink, sink, source); err != nil {
if err != io.EOF && !errors.Is(err, context.Canceled) {
e.BuildLogger.Warningln(fmt.Sprintf(
"error streaming logs for container %s: %s",
containerName,
err.Error(),
))
}
}
e.BuildLogger.Debugln("stopped streaming logs for container " + containerName)
}()
return nil
}