func PortForward()

in pkg/util/kubernetes/portforward.go [38:119]


func PortForward(ctx context.Context, c client.Client, ns, labelSelector string, localPort, remotePort uint, stdOut, stdErr io.Writer) error {
	list, err := c.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
		LabelSelector: labelSelector,
	})
	if err != nil {
		return err
	}

	var forwardPod *corev1.Pod
	var forwardCtx context.Context
	var forwardCtxCancel context.CancelFunc

	setupPortForward := func(pod *corev1.Pod) error {
		if forwardPod == nil && podReady(pod) {
			forwardPod = pod
			forwardCtx, forwardCtxCancel = context.WithCancel(ctx)
			if _, err := portFowardPod(forwardCtx, c.GetConfig(), ns, forwardPod.Name, localPort, remotePort, stdOut, stdErr); err != nil {
				return err
			}
		}
		return nil
	}

	if len(list.Items) > 0 {
		if err := setupPortForward(&list.Items[0]); err != nil {
			return err
		}
	}

	watcher, err := c.CoreV1().Pods(ns).Watch(ctx, metav1.ListOptions{
		LabelSelector:   labelSelector,
		ResourceVersion: list.ResourceVersion,
	})
	if err != nil {
		return err
	}

	events := watcher.ResultChan()

	for {
		select {
		case <-ctx.Done():
			return nil
		case e, ok := <-events:
			if !ok {
				return nil
			}

			switch e.Type {
			case watch.Added:
				pod, ok := e.Object.(*corev1.Pod)
				if !ok {
					return fmt.Errorf("type assertion failed: %v", e.Object)
				}
				if err := setupPortForward(pod); err != nil {
					return err
				}
			case watch.Modified:
				pod, ok := e.Object.(*corev1.Pod)
				if !ok {
					return fmt.Errorf("type assertion failed: %v", e.Object)
				}
				if err := setupPortForward(pod); err != nil {
					return err
				}
			case watch.Deleted:
				if forwardPod != nil && e.Object != nil {
					deletedPod, ok := e.Object.(*corev1.Pod)
					if !ok {
						return fmt.Errorf("type assertion failed: %v", e.Object)
					}
					if deletedPod.Name == forwardPod.Name {
						forwardCtxCancel()
						forwardPod = nil
						forwardCtx = nil
						forwardCtxCancel = nil
					}
				}
			}
		}
	}
}