func()

in internal/pkg/docker/orchestrator/orchestrator.go [188:259]


func (a *runTaskAction) Do(o *Orchestrator) error {
	// we no longer care about errors from the old task
	taskID := o.curTaskID.Add(1)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// cancelCtxOnStop calls cancel if Stop() is called before ctx finishes.
	o.wg.Add(1)
	go func() {
		defer o.wg.Done()
		select {
		case <-ctx.Done():
		case <-o.stopped:
			cancel()
		}
	}()
	prevTask := o.curTask
	o.curTask = a.task
	if taskID == 1 {
		if err := o.buildPauseContainer(ctx); err != nil {
			return fmt.Errorf("build pause container: %w", err)
		}

		// start the pause container
		opts := o.pauseRunOptions(a.task)
		o.run(pauseCtrTaskID, opts, true, cancel)
		if err := o.waitForContainerToStart(ctx, opts.ContainerName); err != nil {
			return fmt.Errorf("wait for pause container to start: %w", err)
		}

		if len(a.hosts) > 0 {
			if err := o.setupProxyConnections(ctx, opts.ContainerName, a); err != nil {
				return fmt.Errorf("setup proxy connections: %w", err)
			}
		}
	} else {
		// ensure no pause container changes
		prevOpts := o.pauseRunOptions(prevTask)
		newOpts := o.pauseRunOptions(a.task)
		if !maps.Equal(prevOpts.EnvVars, newOpts.EnvVars) ||
			!maps.Equal(prevOpts.Secrets, newOpts.Secrets) ||
			!maps.Equal(prevOpts.ContainerPorts, newOpts.ContainerPorts) {
			return errors.New("new task requires recreating pause container")
		}

		if err := o.stopTask(ctx, prevTask); err != nil {
			return fmt.Errorf("stop existing task: %w", err)
		}
	}
	depGraph := buildDependencyGraph(a.task.Containers)
	err := depGraph.UpwardTraversal(ctx, func(ctx context.Context, containerName string) error {
		if len(a.task.Containers[containerName].DependsOn) > 0 {
			if err := o.waitForContainerDependencies(ctx, containerName, a.task.Containers); err != nil {
				return fmt.Errorf("wait for container %s dependencies: %w", containerName, err)
			}
		}
		o.run(taskID, o.containerRunOptions(containerName, a.task.Containers[containerName]), a.task.Containers[containerName].IsEssential, cancel)
		var errContainerExited *dockerengine.ErrContainerExited
		if err := o.waitForContainerToStart(ctx, o.containerID(containerName)); err != nil && !errors.As(err, &errContainerExited) {
			return fmt.Errorf("wait for container %s to start: %w", containerName, err)
		}
		return nil
	})
	if err != nil {
		if errors.Is(err, context.Canceled) {
			return nil
		}
		return fmt.Errorf("upward traversal: %w", err)
	}
	return nil
}