in pkg/dev/portforward/pod_forwarder.go [191:282]
func (f *PodForwarder) Run(ctx context.Context) error {
log.V(2).Info("Running port-forwarder for", "addr", f.addr)
defer log.V(2).Info("No longer running port-forwarder for", "addr", f.addr)
// used as a safeguard to ensure we only close the init channel once
initCloser := sync.Once{}
// wrap this in a sync.Once because it will panic if it happens more than once
// ensure that initChan is closed even if we were never ready.
defer initCloser.Do(func() {
close(f.initChan)
})
// derive a new context so we can ensure the port-forwarding is stopped before we return and that we return as
// soon as the port-forwarding stops, whichever occurs first
runCtx, runCtxCancel := context.WithCancel(ctx)
defer runCtxCancel()
if f.clientset != nil {
log.V(2).Info("Watching pod for changes", "namespace", f.podNSN.Namespace, "pod_name", f.podNSN.Name)
w, err := f.clientset.CoreV1().Pods(f.podNSN.Namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", f.podNSN.Name).String(),
})
if err != nil {
return fmt.Errorf("unable to watch pod %s for changes: %w", f.podNSN, err)
}
defer w.Stop()
go func() {
for {
select {
case evt := <-w.ResultChan():
if evt.Type == watch.Deleted || evt.Type == watch.Error || evt.Type == "" {
log.V(2).Info(
"Pod is deleted or watch failed/closed, closing pod forwarder",
"namespace", f.podNSN.Namespace,
"pod_name", f.podNSN.Name,
)
runCtxCancel()
return
}
case <-runCtx.Done():
return
}
}
}()
}
_, port, err := net.SplitHostPort(f.addr)
if err != nil {
return err
}
// find an available local ephemeral port
localPort, err := f.ephemeralPortFinder()
if err != nil {
return err
}
readyChan := make(chan struct{})
fwd, err := f.portForwarderFactory(
runCtx,
f.podNSN.Namespace,
f.podNSN.Name,
[]string{localPort + ":" + port},
readyChan,
)
if err != nil {
return err
}
// wait for our context to be done or the port forwarder to become ready
go func() {
select {
case <-runCtx.Done():
case <-readyChan:
f.viaAddr = "127.0.0.1:" + localPort
log.V(2).Info("Ready to redirect connections", "addr", f.addr, "via", f.viaAddr)
// wrap this in a sync.Once because it will panic if it happens more than once, which it may if our
// outer function returned just as readyChan was closed.
initCloser.Do(func() {
close(f.initChan)
})
}
}()
err = fwd.ForwardPorts()
f.viaErr = errors.New("not currently forwarding")
return err
}