pkg/task/task.go (223 lines of code) (raw):

package task import ( "context" "fmt" "os" "path/filepath" "time" "sigs.k8s.io/yaml" "github.com/aws/eks-anywhere/pkg/cluster" "github.com/aws/eks-anywhere/pkg/filewriter" "github.com/aws/eks-anywhere/pkg/logger" "github.com/aws/eks-anywhere/pkg/providers" "github.com/aws/eks-anywhere/pkg/types" "github.com/aws/eks-anywhere/pkg/workflows/interfaces" ) // Task is a logical unit of work - meant to be implemented by each Task. type Task interface { Run(ctx context.Context, commandContext *CommandContext) Task Name() string Checkpoint() *CompletedTask Restore(ctx context.Context, commandContext *CommandContext, completedTask *CompletedTask) (Task, error) } // Command context maintains the mutable and shared entities. type CommandContext struct { ClientFactory interfaces.ClientFactory Bootstrapper interfaces.Bootstrapper Provider providers.Provider ClusterManager interfaces.ClusterManager GitOpsManager interfaces.GitOpsManager Validations interfaces.Validator Writer filewriter.FileWriter EksdInstaller interfaces.EksdInstaller EksaInstaller interfaces.EksaInstaller PackageManager interfaces.PackageManager EksdUpgrader interfaces.EksdUpgrader ClusterUpgrader interfaces.ClusterUpgrader ClusterCreator interfaces.ClusterCreator ClusterDeleter interfaces.ClusterDeleter CAPIManager interfaces.CAPIManager ClusterSpec *cluster.Spec CurrentClusterSpec *cluster.Spec UpgradeChangeDiff *types.ChangeDiff BootstrapCluster *types.Cluster ManagementCluster *types.Cluster WorkloadCluster *types.Cluster Profiler *Profiler OriginalError error BackupClusterStateDir string ForceCleanup bool ClusterMover interfaces.ClusterMover IamAuth interfaces.AwsIamAuth } func (c *CommandContext) SetError(err error) { if c.OriginalError == nil { c.OriginalError = err } } type Profiler struct { metrics map[string]map[string]time.Duration starts map[string]map[string]time.Time } // profiler for a Task. func (pp *Profiler) SetStartTask(taskName string) { pp.SetStart(taskName, taskName) } // this can be used to profile sub tasks. func (pp *Profiler) SetStart(taskName string, msg string) { if _, ok := pp.starts[taskName]; !ok { pp.starts[taskName] = map[string]time.Time{} } pp.starts[taskName][msg] = time.Now() } // needs to be called after setStart. func (pp *Profiler) MarkDoneTask(taskName string) { pp.MarkDone(taskName, taskName) } // this can be used to profile sub tasks. func (pp *Profiler) MarkDone(taskName string, msg string) { if _, ok := pp.metrics[taskName]; !ok { pp.metrics[taskName] = map[string]time.Duration{} } if start, ok := pp.starts[taskName][msg]; ok { pp.metrics[taskName][msg] = time.Since(start) } } // get Metrics. func (pp *Profiler) Metrics() map[string]map[string]time.Duration { return pp.metrics } // debug logs for task metric. func (pp *Profiler) logProfileSummary(taskName string) { if durationMap, ok := pp.metrics[taskName]; ok { for k, v := range durationMap { if k != taskName { logger.V(4).Info("Subtask finished", "task_name", taskName, "subtask_name", k, "duration", v) } } if totalTaskDuration, ok := durationMap[taskName]; ok { logger.V(4).Info("Task finished", "task_name", taskName, "duration", totalTaskDuration) logger.V(4).Info("----------------------------------") } } } // Manages Task execution. type taskRunner struct { task Task writer filewriter.FileWriter withCheckpoint bool } type TaskRunnerOpt func(*taskRunner) func WithCheckpointFile() TaskRunnerOpt { return func(t *taskRunner) { logger.V(4).Info("Checkpoint feature enabled") t.withCheckpoint = true } } func (tr *taskRunner) RunTask(ctx context.Context, commandContext *CommandContext) error { checkpointFileName := fmt.Sprintf("%s-checkpoint.yaml", commandContext.ClusterSpec.Cluster.Name) var checkpointInfo CheckpointInfo var err error commandContext.BackupClusterStateDir = fmt.Sprintf("%s-backup-%s", commandContext.ClusterSpec.Cluster.Name, time.Now().Format("2006-01-02T15_04_05")) commandContext.Profiler = &Profiler{ metrics: make(map[string]map[string]time.Duration), starts: make(map[string]map[string]time.Time), } task := tr.task start := time.Now() defer taskRunnerFinalBlock(start) checkpointInfo, err = tr.setupCheckpointInfo(commandContext, checkpointFileName) if err != nil { return err } for task != nil { if completedTask, ok := checkpointInfo.CompletedTasks[task.Name()]; ok { logger.V(4).Info("Restoring task", "task_name", task.Name()) nextTask, err := task.Restore(ctx, commandContext, completedTask) if err != nil { return fmt.Errorf("restoring checkpoint info: %v", err) } task = nextTask continue } logger.V(4).Info("Task start", "task_name", task.Name()) commandContext.Profiler.SetStartTask(task.Name()) nextTask := task.Run(ctx, commandContext) commandContext.Profiler.MarkDoneTask(task.Name()) commandContext.Profiler.logProfileSummary(task.Name()) if commandContext.OriginalError == nil { checkpointInfo.taskCompleted(task.Name(), task.Checkpoint()) } task = nextTask } if commandContext.OriginalError != nil { if err := tr.saveCheckpoint(checkpointInfo, checkpointFileName); err != nil { return err } } return commandContext.OriginalError } func taskRunnerFinalBlock(startTime time.Time) { logger.V(4).Info("Tasks completed", "duration", time.Since(startTime)) } func NewTaskRunner(task Task, writer filewriter.FileWriter, opts ...TaskRunnerOpt) *taskRunner { t := &taskRunner{ task: task, writer: writer, } for _, o := range opts { o(t) } return t } func (tr *taskRunner) saveCheckpoint(checkpointInfo CheckpointInfo, filename string) error { logger.V(4).Info("Saving checkpoint", "file", filename) content, err := yaml.Marshal(checkpointInfo) if err != nil { return fmt.Errorf("saving task runner checkpoint: %v\n", err) } if _, err = tr.writer.Write(filename, content); err != nil { return fmt.Errorf("saving task runner checkpoint: %v\n", err) } return nil } func (tr *taskRunner) setupCheckpointInfo(commandContext *CommandContext, checkpointFileName string) (CheckpointInfo, error) { checkpointInfo := newCheckpointInfo() if tr.withCheckpoint { checkpointFilePath := filepath.Join(commandContext.Writer.TempDir(), checkpointFileName) if _, err := os.Stat(checkpointFilePath); err == nil { checkpointFile, err := readCheckpointFile(checkpointFilePath) if err != nil { return checkpointInfo, err } checkpointInfo.CompletedTasks = checkpointFile.CompletedTasks } } return checkpointInfo, nil } type TaskCheckpoint interface{} type CheckpointInfo struct { CompletedTasks map[string]*CompletedTask `json:"completedTasks"` } type CompletedTask struct { Checkpoint TaskCheckpoint `json:"checkpoint"` } func newCheckpointInfo() CheckpointInfo { return CheckpointInfo{ CompletedTasks: make(map[string]*CompletedTask), } } func (c CheckpointInfo) taskCompleted(name string, completedTask *CompletedTask) { c.CompletedTasks[name] = completedTask } func readCheckpointFile(file string) (*CheckpointInfo, error) { logger.V(4).Info("Reading checkpoint", "file", file) content, err := os.ReadFile(file) if err != nil { return nil, fmt.Errorf("failed reading checkpoint file: %v\n", err) } checkpointInfo := &CheckpointInfo{} err = yaml.Unmarshal(content, checkpointInfo) if err != nil { return nil, fmt.Errorf("failed unmarshalling checkpoint: %v\n", err) } return checkpointInfo, nil } /* UnmarshalTaskCheckpoint marshals the received task checkpoint (type interface{}) then unmarshalls it into the desired type specified in the Restore() method. When reading from a yaml file, there isn't a direct way in Go to do a type conversion from interface{} to the desired type. We use interface{} because the TaskCheckpoint type will vary depending on what's needed for a specific task. The known workaround for this is to marshal & unmarshal it into the checkpoint type. */ func UnmarshalTaskCheckpoint(taskCheckpoint TaskCheckpoint, config TaskCheckpoint) error { checkpointYaml, err := yaml.Marshal(taskCheckpoint) if err != nil { return nil } return yaml.Unmarshal(checkpointYaml, config) }