func()

in pkg/dev/portforward/service_forwarder.go [94:185]


func (f *ServiceForwarder) DialContext(ctx context.Context) (net.Conn, error) {
	_, servicePortStr, err := net.SplitHostPort(f.addr)
	if err != nil {
		return nil, err
	}
	servicePort, err := netutils.ParsePort(servicePortStr, false)
	if err != nil {
		return nil, err
	}

	service := corev1.Service{}

	if err := f.client.Get(ctx, f.serviceNSN, &service); err != nil {
		return nil, err
	}

	// TODO: support named ports? how it's supposed to work is not quite clear atm, and we don't use it ourselves
	// so this is deferred to later

	targetPort := intstr.FromInt(0)
	for _, port := range service.Spec.Ports {
		if port.Port == int32(servicePort) {
			// default to using the same port between the service and the target
			targetPort = intstr.FromInt(int(port.Port))

			// if .TargetPort is non-0, we use that
			if port.TargetPort.IntValue() != 0 {
				targetPort = port.TargetPort
			}
			break
		}
	}

	if targetPort.IntValue() == 0 {
		return nil, fmt.Errorf("service is not listening on port: %d", servicePort)
	}

	endpoints := discoveryv1.EndpointSliceList{}
	listOps := &client.ListOptions{
		LabelSelector: labels.SelectorFromSet(labels.Set{discoveryv1.LabelServiceName: service.Name}),
		Namespace:     service.Namespace,
	}

	if err := f.client.List(ctx, &endpoints, listOps); err != nil {
		return nil, err
	}

	var podTargets []*corev1.ObjectReference
	for _, endpointSlice := range endpoints.Items {
		foundPort := false
		for _, port := range endpointSlice.Ports {
			if port.Port == nil {
				continue
			}
			foundPort = *port.Port == int32(targetPort.IntValue())
			if foundPort {
				break
			}
		}
		if !foundPort {
			// Port is not found in the EndpointSlice, try the next one.
			continue
		}
		for _, endpoint := range endpointSlice.Endpoints {
			if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
				// Do not forward to a pod that is not ready.
				// Note that if spec.publishNotReadyAddresses is set to "true", then `ready` is always true:
				//   https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ready
				continue
			}
			if endpoint.TargetRef.Kind != "Pod" {
				continue
			}
			podTargets = append(podTargets, endpoint.TargetRef)
		}
	}

	if len(podTargets) == 0 {
		return nil, errors.New("no pod addresses found in service endpoints")
	}

	pod := podTargets[rand.Intn(len(podTargets))] //nolint:gosec

	// this should match a supported format of parsePodAddr(addr string)
	podAddr := fmt.Sprintf("%s.%s.%s:%s", pod.Name, pod.Namespace, syntheticDNSSegment, targetPort.String())
	forwarder, err := f.store.GetOrCreateForwarder(f.network, podAddr, f.podForwarderFactory)
	if err != nil {
		return nil, err
	}

	return forwarder.DialContext(ctx)
}