in azext_edge/edge/providers/base.py [0:0]
def portforward_socket(namespace: str, pod_name: str, pod_port: str) -> Iterator[socket.socket]:
from kubernetes.stream import portforward
from .edge_api import MqResourceKinds, MQ_ACTIVE_API
api = client.CoreV1Api()
pf = portforward(
api.connect_get_namespaced_pod_portforward,
pod_name,
namespace,
ports=str(pod_port),
)
target_socket: socket.socket = pf.socket(int(pod_port))._socket
internal_tls = False
namespaced_brokers: dict = MQ_ACTIVE_API.get_resources(MqResourceKinds.BROKER, namespace=namespace)
broker = None
if namespaced_brokers and namespaced_brokers["items"]:
broker: Dict[str, Union[str, dict]] = namespaced_brokers["items"][0]
if broker and broker["spec"]:
encrypt_internal_traffic = broker["spec"].get("advanced", {}).get("encryptInternalTraffic")
if is_enabled_str(encrypt_internal_traffic):
internal_tls = True
if internal_tls:
import ssl
context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
target_socket = context.wrap_socket(sock=target_socket)
target_socket.settimeout(10.0)
yield target_socket
target_socket.shutdown(socket.SHUT_RDWR)
target_socket.close()