in pkg/dev/portforward/service_forwarder.go [94:185]
func (f *ServiceForwarder) DialContext(ctx context.Context) (net.Conn, error) {
_, servicePortStr, err := net.SplitHostPort(f.addr)
if err != nil {
return nil, err
}
servicePort, err := netutils.ParsePort(servicePortStr, false)
if err != nil {
return nil, err
}
service := corev1.Service{}
if err := f.client.Get(ctx, f.serviceNSN, &service); err != nil {
return nil, err
}
// TODO: support named ports? how it's supposed to work is not quite clear atm, and we don't use it ourselves
// so this is deferred to later
targetPort := intstr.FromInt(0)
for _, port := range service.Spec.Ports {
if port.Port == int32(servicePort) {
// default to using the same port between the service and the target
targetPort = intstr.FromInt(int(port.Port))
// if .TargetPort is non-0, we use that
if port.TargetPort.IntValue() != 0 {
targetPort = port.TargetPort
}
break
}
}
if targetPort.IntValue() == 0 {
return nil, fmt.Errorf("service is not listening on port: %d", servicePort)
}
endpoints := discoveryv1.EndpointSliceList{}
listOps := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{discoveryv1.LabelServiceName: service.Name}),
Namespace: service.Namespace,
}
if err := f.client.List(ctx, &endpoints, listOps); err != nil {
return nil, err
}
var podTargets []*corev1.ObjectReference
for _, endpointSlice := range endpoints.Items {
foundPort := false
for _, port := range endpointSlice.Ports {
if port.Port == nil {
continue
}
foundPort = *port.Port == int32(targetPort.IntValue())
if foundPort {
break
}
}
if !foundPort {
// Port is not found in the EndpointSlice, try the next one.
continue
}
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
// Do not forward to a pod that is not ready.
// Note that if spec.publishNotReadyAddresses is set to "true", then `ready` is always true:
// https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ready
continue
}
if endpoint.TargetRef.Kind != "Pod" {
continue
}
podTargets = append(podTargets, endpoint.TargetRef)
}
}
if len(podTargets) == 0 {
return nil, errors.New("no pod addresses found in service endpoints")
}
pod := podTargets[rand.Intn(len(podTargets))] //nolint:gosec
// this should match a supported format of parsePodAddr(addr string)
podAddr := fmt.Sprintf("%s.%s.%s:%s", pod.Name, pod.Namespace, syntheticDNSSegment, targetPort.String())
forwarder, err := f.store.GetOrCreateForwarder(f.network, podAddr, f.podForwarderFactory)
if err != nil {
return nil, err
}
return forwarder.DialContext(ctx)
}