executors/internal/autoscaler/acquisition.go (248 lines of code) (raw):
package autoscaler
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/fleeting/fleeting/connector"
fleetingprovider "gitlab.com/gitlab-org/fleeting/fleeting/provider"
nestingapi "gitlab.com/gitlab-org/fleeting/nesting/api"
"gitlab.com/gitlab-org/fleeting/nesting/hypervisor"
"gitlab.com/gitlab-org/fleeting/taskscaler"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/common/buildlogger"
"gitlab.com/gitlab-org/gitlab-runner/executors"
)
var _ executors.Environment = (*acquisitionRef)(nil)
var (
errRefAcqNotSet = errors.New("ref.acq is not set")
errNoNestingImageSpecified = errors.New("no nesting VM image specified to run the job in")
)
type acquisitionRef struct {
key string
acq taskscaler.Acquisition
mapJobImageToVMImage bool
// test hooks
dialAcquisitionInstance connector.DialFn
dialTunnel connector.DialFn
connectNestingFn func(
host string,
logger buildlogger.Logger,
fleetingDialer connector.Client,
) (nestingapi.Client, io.Closer, error)
}
func newAcquisitionRef(key string, mapJobImageToVMImage bool) *acquisitionRef {
return &acquisitionRef{
key: key,
mapJobImageToVMImage: mapJobImageToVMImage,
dialAcquisitionInstance: connector.Dial,
dialTunnel: connector.Dial,
}
}
func (ref *acquisitionRef) Prepare(
ctx context.Context,
logger buildlogger.Logger,
options common.ExecutorPrepareOptions,
) (executors.Client, error) {
if ref.acq == nil {
return nil, errRefAcqNotSet
}
dialCtx, cancel := ref.acq.WithContext(ctx)
defer cancel()
info, err := ref.acq.InstanceConnectInfo(dialCtx)
if cause := context.Cause(dialCtx); cause != nil {
return nil, &common.BuildError{Inner: cause, FailureReason: reasonFromCause(cause)}
}
if err != nil {
return nil, fmt.Errorf("getting instance connect info: %w", err)
}
useExternalAddr := true
if options.Config != nil && options.Config.Autoscaler != nil {
useExternalAddr = options.Config.Autoscaler.ConnectorConfig.UseExternalAddr
}
options.Build.Log().WithFields(logrus.Fields{
"internal-address": info.InternalAddr,
"external-address": info.ExternalAddr,
"use-external-address": useExternalAddr,
"instance-id": info.ID,
"protocol-port": options.Config.Autoscaler.ConnectorConfig.ProtocolPort,
}).Info("Dialing instance")
fleetingDialOpts := connector.DialOptions{
UseExternalAddr: useExternalAddr,
}
logger.Println(fmt.Sprintf("Dialing instance %s...", info.ID))
fleetingDialer, err := ref.dialAcquisitionInstance(dialCtx, info, fleetingDialOpts)
if cause := context.Cause(dialCtx); cause != nil {
return nil, &common.BuildError{Inner: cause, FailureReason: reasonFromCause(cause)}
}
if err != nil {
return nil, err
}
logger.Println(fmt.Sprintf("Instance %s connected", info.ID))
// if nesting is disabled, return a client for the host instance, for example VM Isolation and VM tunnel not needed
if !options.Config.Autoscaler.VMIsolation.Enabled {
return &client{client: fleetingDialer, cleanup: nil}, nil
}
// Enforce VM Isolation by dialing nesting daemon with gRPC
logger.Println("Enforcing VM Isolation")
nc, conn, err := ref.connectNesting(options.Config.Autoscaler.VMIsolation.NestingHost, logger, fleetingDialer)
if err != nil {
fleetingDialer.Close()
return nil, err
}
logger.Println("Creating nesting VM tunnel")
client, err := ref.createVMTunnel(ctx, logger, nc, fleetingDialer, options)
if err != nil {
nc.Close()
conn.Close()
fleetingDialer.Close()
return nil, fmt.Errorf("creating vm tunnel: %w", err)
}
return client, nil
}
func reasonFromCause(cause error) common.JobFailureReason {
switch {
case errors.Is(cause, context.DeadlineExceeded):
return common.JobExecutionTimeout
case errors.Is(cause, context.Canceled):
return common.JobCanceled
default:
return common.RunnerSystemFailure
}
}
func (ref *acquisitionRef) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
if ref.acq == nil {
return context.WithCancel(ctx)
}
return ref.acq.WithContext(ctx)
}
func (ref *acquisitionRef) connectNesting(
host string,
logger buildlogger.Logger,
fleetingDialer connector.Client,
) (nestingapi.Client, io.Closer, error) {
if ref.connectNestingFn != nil {
return ref.connectNestingFn(host, logger, fleetingDialer)
}
conn, err := nestingapi.NewClientConn(
host,
func(ctx context.Context, network, address string) (net.Conn, error) {
logger.Println("Dialing nesting daemon")
return fleetingDialer.Dial(network, address)
},
)
if err != nil {
// Could not dial nesting daemon
return nil, nil, fmt.Errorf("dialing nesting daemon: %w", err)
}
return nestingapi.New(conn), conn, nil
}
func (ref *acquisitionRef) createVMTunnel(
ctx context.Context,
logger buildlogger.Logger,
nc nestingapi.Client,
fleetingDialer connector.Client,
options common.ExecutorPrepareOptions,
) (executors.Client, error) {
nestingCfg := options.Config.Autoscaler.VMIsolation
// use nesting config defined image, unless the executor allows for the
// job image to override.
image := nestingCfg.Image
if options.Build.Image.Name != "" && ref.mapJobImageToVMImage {
image = options.Build.Image.Name
}
image = options.Build.GetAllVariables().ExpandValue(image)
if image == "" {
return nil, errNoNestingImageSpecified
}
logger.Println("Creating nesting VM", image)
// create vm
var vm hypervisor.VirtualMachine
var stompedVMID *string
var err error
err = withInit(ctx, options.Config, nc, func() error {
slot := int32(ref.acq.Slot())
vm, stompedVMID, err = nc.Create(ctx, image, &slot)
return err
})
if err != nil {
return nil, fmt.Errorf("creating nesting vm: %w", err)
}
logger.Infoln("Created nesting VM", vm.GetId(), vm.GetAddr())
if stompedVMID != nil {
logger.Infoln("Stomped nesting VM: ", *stompedVMID)
}
dialer, err := ref.createTunneledDialer(ctx, fleetingDialer, nestingCfg, vm)
if err != nil {
defer func() { _ = nc.Delete(ctx, vm.GetId()) }()
return nil, fmt.Errorf("dialing nesting vm: %w", err)
}
cl := &client{dialer, func() error {
defer fleetingDialer.Close()
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
return nc.Delete(ctx, vm.GetId())
}}
return cl, nil
}
func (ref *acquisitionRef) createTunneledDialer(
ctx context.Context,
dialer connector.Client,
nestingCfg common.VMIsolation,
vm hypervisor.VirtualMachine,
) (connector.Client, error) {
info := fleetingprovider.ConnectInfo{
ConnectorConfig: fleetingprovider.ConnectorConfig{
OS: nestingCfg.ConnectorConfig.OS,
Arch: nestingCfg.ConnectorConfig.Arch,
Protocol: fleetingprovider.Protocol(nestingCfg.ConnectorConfig.Protocol),
ProtocolPort: nestingCfg.ConnectorConfig.ProtocolPort,
Username: nestingCfg.ConnectorConfig.Username,
Password: nestingCfg.ConnectorConfig.Password,
UseStaticCredentials: nestingCfg.ConnectorConfig.UseStaticCredentials,
Keepalive: nestingCfg.ConnectorConfig.Keepalive,
Timeout: nestingCfg.ConnectorConfig.Timeout,
},
InternalAddr: vm.GetAddr(),
}
options := connector.DialOptions{
DialFn: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer.Dial(network, addr)
},
}
ctx, cancel := ref.acq.WithContext(ctx)
defer cancel()
client, err := ref.dialTunnel(ctx, info, options)
if cause := context.Cause(ctx); cause != nil {
return nil, &common.BuildError{Inner: cause, FailureReason: reasonFromCause(cause)}
}
return client, err
}
type client struct {
client connector.Client
cleanup func() error
}
func (c *client) Dial(n string, addr string) (net.Conn, error) {
return c.client.Dial(n, addr)
}
func (c *client) DialRun(ctx context.Context, command string) (net.Conn, error) {
return c.client.DialRun(ctx, command)
}
func (c *client) Run(ctx context.Context, opts executors.RunOptions) error {
err := c.client.Run(ctx, connector.RunOptions(opts))
var exitErr *connector.ExitError
if errors.As(err, &exitErr) {
return &common.BuildError{
Inner: err,
ExitCode: exitErr.ExitCode(),
}
}
return err
}
func (c *client) Close() error {
var err error
if c.cleanup != nil {
err = c.cleanup()
}
if cerr := c.client.Close(); cerr != nil {
return cerr
}
return err
}