pkg/api/client/basic/client.go (117 lines of code) (raw):
// package basic implements a low-level client for the step-runner gRPC service with a (more or less) 1:1 mapping to
// the raw gRPC API. It abstracts all proto types except for StepResult. Use this API if you want full control over
// every stage of running, following and closing a job.
package basic
import (
"context"
"errors"
"fmt"
"io"
"log"
"google.golang.org/grpc"
"gitlab.com/gitlab-org/step-runner/pkg/api/client"
"gitlab.com/gitlab-org/step-runner/proto"
)
type (
StepResultWriter interface {
Write(*proto.StepResult) error
}
StepRunnerClient struct {
client proto.StepRunnerClient
}
)
func toProto(r *client.RunRequest) *proto.RunRequest {
rr := proto.RunRequest{
Id: r.Id,
WorkDir: r.WorkDir,
Env: r.Env,
Steps: r.Steps,
}
rr.Job = &proto.Job{BuildDir: r.WorkDir}
if len(r.Variables) != 0 {
for _, v := range r.Variables {
rr.Job.Variables = append(rr.Job.Variables, &proto.Variable{
Key: v.Key,
Value: v.Value,
File: v.File,
Masked: v.Masked,
})
}
}
return &rr
}
func fromProto(statuses []*proto.Status) []client.Status {
fromProto := func(st *proto.Status) client.Status {
res := client.Status{
Id: st.Id,
Message: st.Message,
State: client.State(st.Status),
StartTime: st.StartTime.AsTime(),
}
if st.EndTime != nil {
res.EndTime = st.EndTime.AsTime()
}
return res
}
result := make([]client.Status, 0, len(statuses))
for _, j := range statuses {
result = append(result, fromProto(j))
}
return result
}
func New(conn *grpc.ClientConn) *StepRunnerClient {
return &StepRunnerClient{
client: proto.NewStepRunnerClient(conn),
}
}
// Run initiates the job defined in runRequest on the connected step-runner service.
func (c *StepRunnerClient) Run(ctx context.Context, runRequest *client.RunRequest) error {
// TODO: compile steps here when we separate step compilation and execution...
if _, err := c.client.Run(ctx, toProto(runRequest), grpc.WaitForReady(true)); err != nil {
return fmt.Errorf("running run request for job %q: %w", runRequest.Id, err)
}
return nil
}
// Close cancelled (if running) the specified job-id, and frees all resources associated with the job.
func (c *StepRunnerClient) Close(ctx context.Context, jobID string) error {
if _, err := c.client.Close(ctx, &proto.CloseRequest{Id: jobID}); err != nil {
return fmt.Errorf("closing job %q: %w", jobID, err)
}
return nil
}
// Status returns the Status of the specified job.
func (c *StepRunnerClient) Status(ctx context.Context, jobID string) (client.Status, error) {
job, err := c.client.Status(context.Background(), &proto.StatusRequest{Id: jobID})
if err != nil {
return client.Status{}, fmt.Errorf("getting status for job %q: %w", jobID, err)
}
return fromProto(job.Jobs)[0], nil
}
// ListJobs returns the Status for all jobs running on the connected step-runner service.
func (c *StepRunnerClient) ListJobs(ctx context.Context) ([]client.Status, error) {
jobs, err := c.client.Status(context.Background(), &proto.StatusRequest{})
if err != nil {
return nil, fmt.Errorf("listing jobs: %w", err)
}
return fromProto(jobs.Jobs), nil
}
// FollowLogs streams logs emitted by the specified job to the specified io.Writer.
func (c *StepRunnerClient) FollowLogs(ctx context.Context, jobID string, offset int64, writer io.Writer) (int64, error) {
if writer == nil {
return -1, errors.New("nil io.Writer")
}
ioStream, err := c.client.FollowLogs(ctx, &proto.FollowLogsRequest{Id: jobID, Offset: offset})
if err != nil {
return -1, fmt.Errorf("following logs for job %q: %w", jobID, err)
}
written := offset
for {
if ctx.Err() != nil {
return -1, ctx.Err()
}
res, err := ioStream.Recv()
if err == io.EOF {
log.Println("logs stream done")
return written, nil
}
if err != nil {
return written, fmt.Errorf("following logs: %w", err)
}
n, err := writer.Write(res.Data)
written += int64(n)
if err != nil {
return written, fmt.Errorf("writing to log sink: %w", err)
}
}
}