def listening_ports()

in AWSIoTDeviceDefenderAgentSDK/collector.py [0:0]


    def listening_ports(self, metrics):
        """
        Iterate over all inet connections in the LISTEN state and extract port and interface.
        """
        udp_ports = []
        tcp_ports = []
        for conn in ps.net_connections(kind='inet'):
            iface = Collector.__get_interface_name(conn.laddr.ip)
            if conn.status == "LISTEN" and conn.type == socket.SOCK_STREAM:
                if iface:
                    tcp_ports.append({'port': conn.laddr.port, 'interface': iface})
                else:
                    tcp_ports.append({'port': conn.laddr.port})
            if conn.type == socket.SOCK_DGRAM:  # on Linux, udp socket status is always "NONE"
                if iface:
                    udp_ports.append({'port': conn.laddr.port, 'interface': iface})
                else:
                    udp_ports.append({'port': conn.laddr.port})

        metrics.add_listening_ports("UDP", udp_ports)
        metrics.add_listening_ports("TCP", tcp_ports)