executors/docker/internal/exec/dialer.go (86 lines of code) (raw):

package exec import ( "context" "fmt" "io" "net" "github.com/docker/docker/api/types/container" "github.com/docker/docker/pkg/stdcopy" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitlab-runner/helpers/docker" "gitlab.com/gitlab-org/step-runner/pkg/api" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type dialerFactory func(io.ReadCloser, io.Writer) func(context.Context) error // proxyConn dials and provides a io.Reader and io.Writer to dialerFn and returns // a connected net.Conn implementation. // // It can be used to connect code expecting a net.Conn with code expecting an // io.Reader and io.Writer. func proxyConn(ctx context.Context, ctxDialerFactory dialerFactory) (net.Conn, error) { connReader, w := io.Pipe() r, connWriter := io.Pipe() go func() { err := ctxDialerFactory(r, w)(ctx) if err != nil { err = fmt.Errorf("dialing step-runner client dialer: %w", err) r.CloseWithError(err) w.CloseWithError(err) } }() return &rwConn{WriteCloser: connWriter, ReadCloser: connReader}, nil } type tunnelingDialer struct { containerID string client docker.Client logger logrus.FieldLogger } func (td *tunnelingDialer) Dial() (*grpc.ClientConn, error) { ctxDialer := func(ctx context.Context, _ string) (net.Conn, error) { dialerFactory := func(source io.ReadCloser, sink io.Writer) func(context.Context) error { return func(ctx context.Context) error { return td.containerExec(ctx, source, sink) } } return proxyConn(ctx, dialerFactory) } conn, err := grpc.NewClient("unix:"+api.DefaultSocketPath(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(ctxDialer)) if err != nil { return nil, fmt.Errorf("creating gRPC client: %w", err) } return conn, nil } func (td *tunnelingDialer) containerExec(ctx context.Context, source io.ReadCloser, sink io.Writer) error { execCreateResp, err := td.client.ContainerExecCreate(ctx, td.containerID, container.ExecOptions{ Cmd: []string{"step-runner", "proxy"}, AttachStdin: true, AttachStderr: true, AttachStdout: true, }) if err != nil { return fmt.Errorf("creating container exec for %q: %w", td.containerID, err) } hijacked, err := td.client.ContainerExecAttach(ctx, execCreateResp.ID, container.ExecStartOptions{}) if err != nil { return fmt.Errorf("attaching to container exec for %q: %w", td.containerID, err) } td.logger.Debugln("exec attached to container", td.containerID) eg := errgroup.Group{} eg.Go(func() error { //nolint:errcheck defer hijacked.Close() _, err := io.Copy(hijacked.Conn, source) if err != nil { return fmt.Errorf("streaming into container %q: %w", td.containerID, err) } return nil }) eg.Go(func() error { //nolint:errcheck defer hijacked.Close() stderr := newOmitWriter() _, err = stdcopy.StdCopy(sink, stderr, hijacked.Reader) if err != nil { return fmt.Errorf("streaming out of container %q: %w (%w)", td.containerID, err, stderr.Error()) } return nil }) return eg.Wait() }