pkg/api/client/extended/client.go (77 lines of code) (raw):
// package extended implements a well-behaved, higher-level client for the step-runner gRPC service. The primary entry
// point, RunAndFollow(), will initiate a steps job, Follow*() the logs to completion, and on
// completion get the job's Status() and Close() the job, releasing all resources.
//
// While it does not do so currently, this client will in the future automatically reconnect and re-initiate
// Following on connection errors using the specified DialFunc.
//
// Callers can use the FollowOutput type to receive streaming logs.
//
// Note that if it is cancelled or times out, the context passed to RunAndFollow will cancel the client AND also call
// Close() on the job, effectively cancelling it on the server side too.
package extended
import (
"context"
"errors"
"fmt"
"io"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"gitlab.com/gitlab-org/step-runner/pkg/api/client"
"gitlab.com/gitlab-org/step-runner/pkg/api/client/basic"
)
type (
FollowOutput struct {
Logs io.Writer
readLogs int64
}
Dialer interface {
Dial() (*grpc.ClientConn, error)
}
StepRunnerClient struct {
*basic.StepRunnerClient
conn *grpc.ClientConn
dialer Dialer
}
)
func New(dialer Dialer) (*StepRunnerClient, error) {
conn, err := dialer.Dial()
if err != nil {
return nil, fmt.Errorf("dialing: %w", err)
}
return &StepRunnerClient{
StepRunnerClient: basic.New(conn),
conn: conn,
// TODO: this will change when we add reconnection
dialer: dialer,
}, nil
}
// RunAndFollow manages the complete lifecycle of a step run request, including initiating the run request, following
// output streams (as configured by FollowOutput), querying the final job status, and finally Closing the job.
//
// Note that if ctx is cancelled or times out, Close will be called on the job, effectively cancelling it on the server
// too.
func (c *StepRunnerClient) RunAndFollow(ctx context.Context, runRequest *client.RunRequest, out *FollowOutput) (client.Status, error) {
err := c.Run(ctx, runRequest)
if err != nil {
return client.Status{}, err
}
// TODO: Do we always want to call Close here? If we don't, we may need to distinguish between recoverable and
// non-recoverable errors. Do we want to capture and return Close errors too? Do we want to call Close() on context
// cancellation/timeout (probably yes)?
//nolint:errcheck
defer c.Close(context.Background(), runRequest.Id)
return c.Follow(ctx, runRequest.Id, out)
}
// Follow follows log streams as configured by FollowOutput, and return the job's final status. If nil
// is specified for either sink, that stream will not be followed. At least one sink must be specified.
func (c *StepRunnerClient) Follow(ctx context.Context, jobID string, out *FollowOutput) (client.Status, error) {
if out.Logs == nil {
return client.Status{}, errors.New("at least one stream sink must be specified")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
eg := errgroup.Group{}
if out.Logs != nil {
eg.Go(func() error {
// TODO: add reconnection
n, err := c.FollowLogs(ctx, jobID, out.readLogs, out.Logs)
out.readLogs += n
if err != nil {
cancel() // force followSteps to exit
}
return err
})
}
var followErr error
if err := eg.Wait(); err != nil {
followErr = fmt.Errorf("following job %q: %w", jobID, err)
}
status, statErr := c.Status(context.Background(), jobID)
if statErr != nil {
return client.Status{}, errors.Join(followErr, statErr)
}
return status, followErr
}
// CloseConn closes the connection to the step-runner service.
func (c *StepRunnerClient) CloseConn() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}