command-runner/pkg/features/depends_on.go (108 lines of code) (raw):
package features
import (
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"github.com/aws/codecatalyst-runner-cli/command-runner/pkg/common"
"github.com/aws/codecatalyst-runner-cli/command-runner/pkg/runner"
"github.com/rs/zerolog/log"
)
// ProgressHandle handles callbacks from the progress on a plan execution
type ProgressHandle interface {
Success() // success is called when the handle completes successfully
Failure(err error) // failure is called when the handle completes unsuccessfully
IsReady(dependsOn ...string) (bool, error) // isReady is called to determine if the handle is ready
}
// DependsOn waits for dependencies
func DependsOn(progressHandle ProgressHandle) runner.Feature {
logged := make([]string, 0)
return func(ctx context.Context, plan runner.Plan, e runner.PlanExecutor) error {
log.Ctx(ctx).Debug().Msg("ENTER DependsOn")
for _, dependsOn := range plan.DependsOn() {
if ready, err := progressHandle.IsReady(dependsOn); err != nil {
log.Ctx(ctx).Err(err).Msgf("❌ aborted while waiting for %s", dependsOn)
progressHandle.Failure(err)
return err
} else if !ready {
if !slices.Contains(logged, dependsOn) {
log.Ctx(ctx).Info().Msgf("⏳ WAITING for %s to succeed", dependsOn)
logged = append(logged, dependsOn)
}
return common.ErrDefer
}
}
err := e(ctx)
if err != nil {
if !errors.Is(err, common.ErrDefer) {
progressHandle.Failure(err)
}
return err
}
progressHandle.Success()
log.Ctx(ctx).Debug().Msg("EXIT DependsOn")
return err
}
}
// PlanTracker provides [ProgressHandle] for each plan and tracks progress across all plans
type PlanTracker struct {
pending []string
failed []string
mu sync.Mutex
}
type progressHandle struct {
pt *PlanTracker
planID string
}
// ProgressHandle returns a [ProgressHandle] for the given plan.
func (pt *PlanTracker) ProgressHandle(planID string) ProgressHandle {
pt.pending = append(pt.pending, planID)
return &progressHandle{
pt: pt,
planID: planID,
}
}
func (ph *progressHandle) Success() {
ph.pt.mu.Lock()
defer ph.pt.mu.Unlock()
newPending := make([]string, 0)
for _, p := range ph.pt.pending {
if p != ph.planID {
newPending = append(newPending, p)
}
}
ph.pt.pending = newPending
}
func (ph *progressHandle) Failure(_ error) {
ph.pt.mu.Lock()
defer ph.pt.mu.Unlock()
ph.pt.failed = append(ph.pt.failed, ph.planID)
newPending := make([]string, 0)
for _, p := range ph.pt.pending {
if p != ph.planID {
newPending = append(newPending, p)
}
}
ph.pt.pending = newPending
}
func (ph *progressHandle) IsReady(dependsOn ...string) (bool, error) {
ready := true
var group string
if strings.Contains(ph.planID, "@") {
parts := strings.Split(ph.planID, "@")
group = parts[0]
}
for _, dependency := range dependsOn {
for _, f := range ph.pt.failed {
if f == dependency || f == fmt.Sprintf("%s@%s", group, dependency) || strings.HasPrefix(f, fmt.Sprintf("%s@", dependency)) {
return false, common.NewWarning("cancelled %s: dependency %s failed", ph.planID, dependency)
}
}
for _, p := range ph.pt.pending {
if p == dependency || p == fmt.Sprintf("%s@%s", group, dependency) || strings.HasPrefix(p, fmt.Sprintf("%s@", dependency)) {
ready = false
log.Debug().Msgf("DEFER [%s] for dependency [%s]", ph.planID, p)
break
}
}
}
return ready, nil
}