helpers/runner_wrapper/api/client/client.go (125 lines of code) (raw):

package client import ( "context" "fmt" "log/slog" "net" "time" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "gitlab.com/gitlab-org/gitlab-runner/helpers/runner_wrapper/api" pb "gitlab.com/gitlab-org/gitlab-runner/helpers/runner_wrapper/api/proto" ) const ( DefaultConnectTimeout = 5 * time.Second ) type Dialer func(network string, address string) (net.Conn, error) type Client struct { logger *slog.Logger grpcConn *grpc.ClientConn grpcClient pb.ProcessWrapperClient } func New(target string, opts ...Option) (*Client, error) { target = formatGRPCCompatible(target) o := setupOptions(opts) logger := o.logger.WithGroup("client").With("target", target) grpcOpts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(_ context.Context, addr string) (net.Conn, error) { network, address := parseDialTarget(addr) log := logger.With("network", network, "address", address) log.Debug("dialing gRPC server") conn, err := o.dialer(network, address) if err != nil { log.Error("gRPC dial failure", "error", err) } log.Debug("dialed gRPC server") return conn, err }), } conn, err := grpc.NewClient(target, grpcOpts...) if err != nil { return nil, fmt.Errorf("creating gRPC client: %w", err) } c := &Client{ logger: logger, grpcConn: conn, grpcClient: pb.NewProcessWrapperClient(conn), } return c, nil } func (c *Client) Connect(ctx context.Context) error { return c.ConnectWithTimeout(ctx, DefaultConnectTimeout) } func (c *Client) ConnectWithTimeout(ctx context.Context, timeout time.Duration) error { c.logger.Debug("connecting to gRPC server") c.grpcConn.Connect() err := RetryWithBackoff(ctx, timeout, func() error { state := c.grpcConn.GetState() if state != connectivity.Ready { return fmt.Errorf("gRPC connection is not ready: %s", state) } return nil }) if err != nil { c.logger.Warn("gRPC connection failure", "error", err) return err } c.logger.Debug("gRPC connection succeeded") return nil } type CheckStatusResponse struct { Status api.Status FailureReason string } func (c *Client) CheckStatus(ctx context.Context) (CheckStatusResponse, error) { c.logger.Info("Checking status") var resp CheckStatusResponse s, err := c.grpcClient.CheckStatus(ctx, new(pb.CheckStatusRequest)) if err != nil { c.logger.Warn("gRPC request failure", "error", err) return resp, err } c.logger.Debug("gRPC request succeeded") resp.Status = api.Statuses.Reverse(s.Status) resp.FailureReason = s.FailureReason return resp, nil } func (c *Client) InitGracefulShutdown(ctx context.Context, req api.InitGracefulShutdownRequest) (CheckStatusResponse, error) { c.logger.Info("Initializing graceful shutdown") var resp CheckStatusResponse var shutdownCallback *pb.ShutdownCallback if req != nil { shutdownCallbackDef := req.ShutdownCallbackDef() if shutdownCallbackDef != nil { shutdownCallback.Url = shutdownCallbackDef.URL() shutdownCallback.Method = shutdownCallbackDef.Method() shutdownCallback.Headers = shutdownCallbackDef.Headers() } } s, err := c.grpcClient.InitGracefulShutdown(ctx, &pb.InitGracefulShutdownRequest{ ShutdownCallback: shutdownCallback, }) if err != nil { c.logger.Warn("gRPC request failure", "error", err) return resp, err } c.logger.Debug("gRPC request succeeded") resp.Status = api.Statuses.Reverse(s.Status) resp.FailureReason = s.FailureReason return resp, nil } func (c *Client) InitForcefulShutdown(ctx context.Context) (CheckStatusResponse, error) { c.logger.Info("Initializing forceful shutdown") var resp CheckStatusResponse s, err := c.grpcClient.InitForcefulShutdown(ctx, new(pb.InitForcefulShutdownRequest)) if err != nil { c.logger.Warn("gRPC request failure", "error", err) return resp, err } c.logger.Debug("gRPC request succeeded") resp.Status = api.Statuses.Reverse(s.Status) resp.FailureReason = s.FailureReason return resp, nil }