func()

in pkg/dev/portforward/pod_forwarder.go [191:282]


func (f *PodForwarder) Run(ctx context.Context) error {
	log.V(2).Info("Running port-forwarder for", "addr", f.addr)
	defer log.V(2).Info("No longer running port-forwarder for", "addr", f.addr)

	// used as a safeguard to ensure we only close the init channel once
	initCloser := sync.Once{}

	// wrap this in a sync.Once because it will panic if it happens more than once
	// ensure that initChan is closed even if we were never ready.
	defer initCloser.Do(func() {
		close(f.initChan)
	})

	// derive a new context so we can ensure the port-forwarding is stopped before we return and that we return as
	// soon as the port-forwarding stops, whichever occurs first
	runCtx, runCtxCancel := context.WithCancel(ctx)
	defer runCtxCancel()

	if f.clientset != nil {
		log.V(2).Info("Watching pod for changes", "namespace", f.podNSN.Namespace, "pod_name", f.podNSN.Name)
		w, err := f.clientset.CoreV1().Pods(f.podNSN.Namespace).Watch(ctx, metav1.ListOptions{
			FieldSelector: fields.OneTermEqualSelector("metadata.name", f.podNSN.Name).String(),
		})
		if err != nil {
			return fmt.Errorf("unable to watch pod %s for changes: %w", f.podNSN, err)
		}
		defer w.Stop()

		go func() {
			for {
				select {
				case evt := <-w.ResultChan():
					if evt.Type == watch.Deleted || evt.Type == watch.Error || evt.Type == "" {
						log.V(2).Info(
							"Pod is deleted or watch failed/closed, closing pod forwarder",
							"namespace", f.podNSN.Namespace,
							"pod_name", f.podNSN.Name,
						)
						runCtxCancel()
						return
					}
				case <-runCtx.Done():
					return
				}
			}
		}()
	}

	_, port, err := net.SplitHostPort(f.addr)
	if err != nil {
		return err
	}

	// find an available local ephemeral port
	localPort, err := f.ephemeralPortFinder()
	if err != nil {
		return err
	}

	readyChan := make(chan struct{})
	fwd, err := f.portForwarderFactory(
		runCtx,
		f.podNSN.Namespace,
		f.podNSN.Name,
		[]string{localPort + ":" + port},
		readyChan,
	)
	if err != nil {
		return err
	}

	// wait for our context to be done or the port forwarder to become ready
	go func() {
		select {
		case <-runCtx.Done():
		case <-readyChan:
			f.viaAddr = "127.0.0.1:" + localPort

			log.V(2).Info("Ready to redirect connections", "addr", f.addr, "via", f.viaAddr)

			// wrap this in a sync.Once because it will panic if it happens more than once, which it may if our
			// outer function returned just as readyChan was closed.
			initCloser.Do(func() {
				close(f.initChan)
			})
		}
	}()

	err = fwd.ForwardPorts()
	f.viaErr = errors.New("not currently forwarding")
	return err
}