def portforward_socket()

in azext_edge/edge/providers/base.py [0:0]


def portforward_socket(namespace: str, pod_name: str, pod_port: str) -> Iterator[socket.socket]:
    from kubernetes.stream import portforward
    from .edge_api import MqResourceKinds, MQ_ACTIVE_API

    api = client.CoreV1Api()
    pf = portforward(
        api.connect_get_namespaced_pod_portforward,
        pod_name,
        namespace,
        ports=str(pod_port),
    )
    target_socket: socket.socket = pf.socket(int(pod_port))._socket

    internal_tls = False
    namespaced_brokers: dict = MQ_ACTIVE_API.get_resources(MqResourceKinds.BROKER, namespace=namespace)
    broker = None
    if namespaced_brokers and namespaced_brokers["items"]:
        broker: Dict[str, Union[str, dict]] = namespaced_brokers["items"][0]

    if broker and broker["spec"]:
        encrypt_internal_traffic = broker["spec"].get("advanced", {}).get("encryptInternalTraffic")
        if is_enabled_str(encrypt_internal_traffic):
            internal_tls = True

    if internal_tls:
        import ssl

        context = ssl.create_default_context()
        context.minimum_version = ssl.TLSVersion.TLSv1_2
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE
        target_socket = context.wrap_socket(sock=target_socket)

    target_socket.settimeout(10.0)
    yield target_socket
    target_socket.shutdown(socket.SHUT_RDWR)
    target_socket.close()