in pkg/util/kubernetes/portforward.go [38:119]
func PortForward(ctx context.Context, c client.Client, ns, labelSelector string, localPort, remotePort uint, stdOut, stdErr io.Writer) error {
list, err := c.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return err
}
var forwardPod *corev1.Pod
var forwardCtx context.Context
var forwardCtxCancel context.CancelFunc
setupPortForward := func(pod *corev1.Pod) error {
if forwardPod == nil && podReady(pod) {
forwardPod = pod
forwardCtx, forwardCtxCancel = context.WithCancel(ctx)
if _, err := portFowardPod(forwardCtx, c.GetConfig(), ns, forwardPod.Name, localPort, remotePort, stdOut, stdErr); err != nil {
return err
}
}
return nil
}
if len(list.Items) > 0 {
if err := setupPortForward(&list.Items[0]); err != nil {
return err
}
}
watcher, err := c.CoreV1().Pods(ns).Watch(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
ResourceVersion: list.ResourceVersion,
})
if err != nil {
return err
}
events := watcher.ResultChan()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-events:
if !ok {
return nil
}
switch e.Type {
case watch.Added:
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("type assertion failed: %v", e.Object)
}
if err := setupPortForward(pod); err != nil {
return err
}
case watch.Modified:
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("type assertion failed: %v", e.Object)
}
if err := setupPortForward(pod); err != nil {
return err
}
case watch.Deleted:
if forwardPod != nil && e.Object != nil {
deletedPod, ok := e.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("type assertion failed: %v", e.Object)
}
if deletedPod.Name == forwardPod.Name {
forwardCtxCancel()
forwardPod = nil
forwardCtx = nil
forwardCtxCancel = nil
}
}
}
}
}
}