in azext_edge/edge/providers/base.py [0:0]
def portforward_http(namespace: str, pod_name: str, pod_port: str, **kwargs) -> Iterator[PodRequest]:
from kubernetes.stream import portforward
api = client.CoreV1Api()
def kubernetes_create_connection(address, *args, **kwargs):
dns_name = address[0]
if isinstance(dns_name, bytes):
dns_name = dns_name.decode()
dns_name = dns_name.split(".")
if len(dns_name) != 3 or dns_name[2] != "kubernetes":
return socket_create_connection(address, *args, **kwargs)
pf = portforward(
api.connect_get_namespaced_pod_portforward,
dns_name[0],
dns_name[1],
ports=str(address[1]),
)
return pf.socket(address[1])
socket_create_connection = socket.create_connection
try:
socket.create_connection = kubernetes_create_connection
pod_request = PodRequest(namespace=namespace, pod_name=pod_name, pod_port=pod_port)
yield pod_request
finally:
socket.create_connection = socket_create_connection