in internal/pkg/docker/orchestrator/orchestrator.go [188:259]
func (a *runTaskAction) Do(o *Orchestrator) error {
// we no longer care about errors from the old task
taskID := o.curTaskID.Add(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// cancelCtxOnStop calls cancel if Stop() is called before ctx finishes.
o.wg.Add(1)
go func() {
defer o.wg.Done()
select {
case <-ctx.Done():
case <-o.stopped:
cancel()
}
}()
prevTask := o.curTask
o.curTask = a.task
if taskID == 1 {
if err := o.buildPauseContainer(ctx); err != nil {
return fmt.Errorf("build pause container: %w", err)
}
// start the pause container
opts := o.pauseRunOptions(a.task)
o.run(pauseCtrTaskID, opts, true, cancel)
if err := o.waitForContainerToStart(ctx, opts.ContainerName); err != nil {
return fmt.Errorf("wait for pause container to start: %w", err)
}
if len(a.hosts) > 0 {
if err := o.setupProxyConnections(ctx, opts.ContainerName, a); err != nil {
return fmt.Errorf("setup proxy connections: %w", err)
}
}
} else {
// ensure no pause container changes
prevOpts := o.pauseRunOptions(prevTask)
newOpts := o.pauseRunOptions(a.task)
if !maps.Equal(prevOpts.EnvVars, newOpts.EnvVars) ||
!maps.Equal(prevOpts.Secrets, newOpts.Secrets) ||
!maps.Equal(prevOpts.ContainerPorts, newOpts.ContainerPorts) {
return errors.New("new task requires recreating pause container")
}
if err := o.stopTask(ctx, prevTask); err != nil {
return fmt.Errorf("stop existing task: %w", err)
}
}
depGraph := buildDependencyGraph(a.task.Containers)
err := depGraph.UpwardTraversal(ctx, func(ctx context.Context, containerName string) error {
if len(a.task.Containers[containerName].DependsOn) > 0 {
if err := o.waitForContainerDependencies(ctx, containerName, a.task.Containers); err != nil {
return fmt.Errorf("wait for container %s dependencies: %w", containerName, err)
}
}
o.run(taskID, o.containerRunOptions(containerName, a.task.Containers[containerName]), a.task.Containers[containerName].IsEssential, cancel)
var errContainerExited *dockerengine.ErrContainerExited
if err := o.waitForContainerToStart(ctx, o.containerID(containerName)); err != nil && !errors.As(err, &errContainerExited) {
return fmt.Errorf("wait for container %s to start: %w", containerName, err)
}
return nil
})
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("upward traversal: %w", err)
}
return nil
}