in sources/host-ctr/cmd/host-ctr/main.go [219:469]
func runCtr(containerdSocket string, namespace string, containerID string, source string, superpowered bool, registryConfigPath string, cType containerType) error {
// Check if the containerType provided is valid
if !cType.IsValid() {
return errors.New("Invalid container type")
}
// Return error if caller tries to setup bootstrap container as superpowered
if cType == bootstrap && superpowered {
return errors.New("Bootstrap containers can't be superpowered")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = namespaces.WithNamespace(ctx, namespace)
go func(ctx context.Context, cancel context.CancelFunc) {
// Set up channel on which to send signal notifications.
// We must use a buffered channel or risk missing the signal
// if we're not ready to receive when the signal is sent.
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
for sigrecv := range c {
log.G(ctx).Info("received signal: ", sigrecv)
cancel()
}
}(ctx, cancel)
client, err := newContainerdClient(ctx, containerdSocket, namespace)
if err != nil {
return err
}
defer client.Close()
// Parse the source ref if it looks like an ECR ref.
isECRImage := ecrRegex.MatchString(source)
var img containerd.Image
if isECRImage {
img, err = pullECRImage(ctx, source, client, registryConfigPath)
if err != nil {
return err
}
} else {
img, err = pullImage(ctx, source, client, registryConfigPath)
if err != nil {
log.G(ctx).WithField("ref", source).Error(err)
return err
}
}
prefix := cType.Prefix()
containerName := containerID
containerID = prefix + containerID
// Check if the target container already exists. If it does, take over the helm to manage it.
container, err := client.LoadContainer(ctx, containerID)
if err != nil {
if errdefs.IsNotFound(err) {
log.G(ctx).WithField("ctr-id", containerID).Info("Container does not exist, proceeding to create it")
} else {
log.G(ctx).WithField("ctr-id", containerID).WithError(err).Error("failed to retrieve list of containers")
return err
}
}
// If the container doesn't already exist, create it
if container == nil {
// Set the destination name for the container persistent storage location
persistentDir := cType.PersistentDir()
specOpts := []oci.SpecOpts{
oci.WithImageConfig(img),
oci.WithHostNamespace(runtimespec.NetworkNamespace),
oci.WithHostHostsFile,
oci.WithHostResolvconf,
// Pass proxy environment variables to this container
withProxyEnv(),
// Add a default set of mounts regardless of the container type
withDefaultMounts(containerName, persistentDir),
// Mount the container's rootfs with an SELinux label that makes it writable
withMountLabel("system_u:object_r:secret_t:s0"),
}
// Select the set of specOpts based on the container type
switch {
case superpowered:
specOpts = append(specOpts, withSuperpowered())
case cType == bootstrap:
specOpts = append(specOpts, withBootstrap())
default:
specOpts = append(specOpts, withDefault())
}
ctrOpts := containerd.WithNewSpec(specOpts...)
// Create the container.
container, err = client.NewContainer(
ctx,
containerID,
containerd.WithImage(img),
containerd.WithNewSnapshot(containerID+"-snapshot", img),
containerd.WithRuntime("io.containerd.runc.v2", &options.Options{
Root: "/run/host-containerd/runc",
}),
ctrOpts,
)
if err != nil {
log.G(ctx).WithError(err).WithField("img", img.Name).Error("failed to create container")
return err
}
}
defer func() {
// Clean up the container as program wraps up.
cleanup, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := container.Delete(cleanup, containerd.WithSnapshotCleanup)
if err != nil {
log.G(cleanup).WithError(err).Error("failed to cleanup container")
}
}()
// Check if the container task already exists. If it does, try to manage it.
task, err := container.Task(ctx, cio.NewAttach(cio.WithStdio))
if err != nil {
if errdefs.IsNotFound(err) {
log.G(ctx).WithField("container-id", containerID).Info("container task does not exist, proceeding to create it")
} else {
log.G(ctx).WithField("container-id", containerID).WithError(err).Error("failed to retrieve container task")
return err
}
}
// If the container doesn't already exist, create it
taskAlreadyRunning := false
if task == nil {
// Create the container task
task, err = container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
if err != nil {
log.G(ctx).WithError(err).Error("failed to create container task")
return err
}
} else {
// Check the container task process status and see if it's already running.
taskStatus, err := task.Status(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to retrieve container task status")
return err
}
log.G(ctx).WithField("task status", taskStatus.Status).Info("found existing container task")
// If the task isn't running (it's in some weird state like `Paused`), we should replace it with a new task.
if taskStatus.Status != containerd.Running {
_, err := task.Delete(ctx, containerd.WithProcessKill)
if err != nil {
log.G(ctx).WithError(err).Error("failed to delete existing container task")
return err
}
log.G(ctx).Info("killed existing container task to replace it with a new task")
// Recreate the container task
task, err = container.NewTask(ctx, cio.NewCreator(cio.WithStdio))
if err != nil {
log.G(ctx).WithError(err).Error("failed to create container task")
return err
}
} else {
log.G(ctx).Info("container task is still running, proceeding to monitor it")
taskAlreadyRunning = true
}
}
defer func() {
// Clean up the container's task as program wraps up.
cleanup, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := task.Delete(cleanup)
if err != nil {
log.G(cleanup).WithError(err).Error("failed to delete container task")
}
}()
// Call `Wait` to ensure the task's status notifications are received.
exitStatusC, err := task.Wait(context.TODO())
if err != nil {
log.G(ctx).WithError(err).Error("unexpected error during container task setup")
return err
}
if !taskAlreadyRunning {
// Execute the target container's task.
if err := task.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to start container task")
return err
}
log.G(ctx).Info("successfully started container task")
}
// Block until an OS signal (e.g. SIGTERM, SIGINT) is received or the
// container task finishes and exits on its own.
// Container task's exit status.
var status containerd.ExitStatus
// Context used when stopping and cleaning up the container task
ctrCtx, cancel := context.WithCancel(context.Background())
defer cancel()
select {
case <-ctx.Done():
// SIGTERM the container task and get its exit status
if err := task.Kill(ctrCtx, syscall.SIGTERM); err != nil {
log.G(ctrCtx).WithError(err).Error("failed to send SIGTERM to container")
return err
}
// Wait for 20 seconds and see check if container task exited
const gracePeriod = 20 * time.Second
timeout := time.NewTimer(gracePeriod)
select {
case status = <-exitStatusC:
// Container task was able to exit on its own, stop the timer.
if !timeout.Stop() {
<-timeout.C
}
case <-timeout.C:
// Container task still hasn't exited, SIGKILL the container task or
// timeout and bail.
const sigkillTimeout = 45 * time.Second
killCtx, cancel := context.WithTimeout(ctrCtx, sigkillTimeout)
err := task.Kill(killCtx, syscall.SIGKILL)
cancel()
if err != nil {
log.G(ctrCtx).WithError(err).Error("failed to SIGKILL container process, timed out")
return err
}
status = <-exitStatusC
}
case status = <-exitStatusC:
// Container task exited on its own
}
code, _, err := status.Result()
if err != nil {
log.G(ctrCtx).WithError(err).Error("failed to get container task exit status")
return err
}
log.G(ctrCtx).WithField("code", code).Info("container task exited")
// Return error if container exists with non-zero status
if code != 0 {
return errors.New(fmt.Sprintf("Container %s exited with non-zero status", containerID))
}
return nil
}