ptf/saidebugcounters.py (1,857 lines of code) (raw):

# Copyright 2021-present Intel Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ''' Thrift SAI interface DebugCounters tests ''' from sai_thrift.sai_headers import * from sai_base_test import * TEST_GET_STATS_DELAY = 3 def map_drop_reason_to_string(drop_reason_list): ''' Maps DebugCounter drop reason to a string. Args: drop_reason_list (list): Drop reasons list Returns: string: drop reasons list names ''' if not isinstance(drop_reason_list, list): drop_reason_list = [drop_reason_list] names = [] drop_reason_map = {} drop_reason_map[SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER] = "VLAN_FILTER" drop_reason_map[SAI_IN_DROP_REASON_L2_ANY] = "L2_ANY" drop_reason_map[SAI_IN_DROP_REASON_SMAC_MULTICAST] = "SMAC_MC" drop_reason_map[SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC] = "SMAC_EQUALS_DMAC" drop_reason_map[SAI_IN_DROP_REASON_TTL] = "TTL" drop_reason_map[SAI_IN_DROP_REASON_IP_HEADER_ERROR] = "IP_HEADER_ERROR" drop_reason_map[SAI_IN_DROP_REASON_SIP_MC] = "SIP_MC" drop_reason_map[SAI_IN_DROP_REASON_L3_ANY] = "L3_ANY" drop_reason_map[SAI_IN_DROP_REASON_IRIF_DISABLED] = "IRIF_DISABLED" drop_reason_map[SAI_IN_DROP_REASON_ACL_ANY] = "ACL_ANY" drop_reason_map[SAI_IN_DROP_REASON_DIP_LOOPBACK] = "DIP_LOOPBACK" drop_reason_map[SAI_IN_DROP_REASON_SIP_LOOPBACK] = "SIP_LOOPBACK" drop_reason_map[SAI_IN_DROP_REASON_SIP_CLASS_E] = "SIP_CLASS_E" drop_reason_map[SAI_IN_DROP_REASON_DIP_LINK_LOCAL] = "DIP_LINK_LOCAL" drop_reason_map[SAI_IN_DROP_REASON_SIP_LINK_LOCAL] = "SIP_LINK_LOCAL" drop_reason_map[SAI_IN_DROP_REASON_SIP_UNSPECIFIED] = "SIP_UNSPECIFIED" drop_reason_map[SAI_IN_DROP_REASON_UC_DIP_MC_DMAC] = "UC_DIP_MC_DMAC" drop_reason_map[SAI_IN_DROP_REASON_NON_ROUTABLE] = "IGMP_NON_ROUTABLE" drop_reason_map[SAI_IN_DROP_REASON_MPLS_MISS] = "MPLS_MISS" drop_reason_map[SAI_IN_DROP_REASON_SRV6_LOCAL_SID_DROP] = \ "SRV6_LOCAL_SID_DROP" drop_reason_map[SAI_IN_DROP_REASON_LPM4_MISS] = "LPM4_MISS" drop_reason_map[SAI_IN_DROP_REASON_LPM6_MISS] = "LPM6_MISS" drop_reason_map[SAI_IN_DROP_REASON_BLACKHOLE_ROUTE] = "BLACKHOLE_ROUTE" for drop_reason in drop_reason_list: names.append(drop_reason_map[drop_reason]) names = ",".join(names) return names def debug_counter_type_to_index_base(dc_type): ''' Converts the DebugCounter type into its index base Args: dc_type (sai_debug_counter_type_t): debug counter type. Returns: string: drop reasons list names ''' index_base = 0 if dc_type == SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS: index_base = SAI_PORT_STAT_IN_DROP_REASON_RANGE_BASE elif dc_type == SAI_DEBUG_COUNTER_TYPE_PORT_OUT_DROP_REASONS: index_base = SAI_PORT_STAT_OUT_DROP_REASON_RANGE_BASE elif dc_type == SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS: index_base = SAI_SWITCH_STAT_IN_DROP_REASON_RANGE_BASE elif dc_type == SAI_DEBUG_COUNTER_TYPE_SWITCH_OUT_DROP_REASONS: index_base = SAI_SWITCH_STAT_OUT_DROP_REASON_RANGE_BASE return index_base def get_port_stats(port, counters): ''' Returns the port stats counters. Args: port (sai_object_id_t): port object id counters (list) : requested counters list Returns: list: counter stats for requested counters list. ''' if not isinstance(counters, list): counters = [counters] time.sleep(TEST_GET_STATS_DELAY) stats = sai_thrift_get_debug_counter_port_stats(port, counters) return stats def get_switch_stats(counters): ''' Returns the switch stats counters. Args: counters (list) : requested counters list Returns: list: stats counter stats for requested counters list. ''' if not isinstance(counters, list): counters = [counters] time.sleep(TEST_GET_STATS_DELAY) stats = sai_thrift_get_debug_counter_switch_stats(counters) return stats @group("draft") class BaseDebugCounterClass(SaiHelperBase): ''' SAI DebugCounters test class. ''' default_mac_1 = '00:11:11:11:11:11' default_mac_2 = '00:22:22:22:22:22' neighbor_mac = '00:11:22:33:44:55' neighbor_mac_v6 = '00:11:22:33:44:66' reserved_mac = '01:80:C2:00:00:01' mc_mac_1 = '01:00:5E:AA:AA:AA' mc_mac_2 = '33:33:5E:AA:AA:AA' neighbor_ip = '10.10.10.1' neighbor_ip_prefix = '10.10.10.0/24' neighbor_ip_2 = '10.10.10.2' loopback_ip = '127.0.0.1' unknown_neighbor_ipv4 = '12.12.12.1' blackhole_ip = '13.13.13.1/32' mc_ip = '224.0.0.1' class_e_ip = '240.0.0.1' unspec_ipv4 = '0.0.0.0' unspec_ipv6 = '::0' bc_ip = '255.255.255.255' link_local_ip = '169.254.0.1' link_local_ip_prefix = '169.254.0.0/16' neighbor_ipv6 = '1234:5678:9abc:def0:4422:1133:5577:99aa' neighbor_ipv6_prefix = '1234:5678:9abc:def0:4422:1133:5577:99aa/64' unknown_neighbor_ipv6 = '4444:5678:9abc:def0:4422:1133:5577:3333' mc_scope_0_ipv6 = 'FF00:0:0:0:0:0:0:1' mc_scope_1_ipv6 = 'FF01:0:0:0:0:0:0:1' addr_family = SAI_IP_ADDR_FAMILY_IPV4 ip_addr_subnet = '10.10.10.0' ip_mask = '255.255.255.0' addr_family_v6 = SAI_IP_ADDR_FAMILY_IPV6 ip_addr_subnet_v6 = '1234:5678:9abc:def0:4422:1133:5577:0' ip_mask_v6 = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:0' v4_enabled = 1 v6_enabled = 1 igmp_type_query = 0x11 igmp_type_leave = 0x17 igmp_type_v1_report = 0x12 igmp_type_v2_report = 0x16 igmp_type_v3_report = 0x22 dc_oid = None second_dc_oid = None vr_ids = [] rif_ids = [] neighbors = [] routes = [] fdbs = [] def setUp(self): super(BaseDebugCounterClass, self).setUp() self.cleanup() self.test_dev_ports = [ self.dev_port0, self.dev_port1, self.dev_port2, self.dev_port3, self.dev_port4, self.dev_port5, self.dev_port6, self.dev_port7] self.test_ports = [ self.port0, self.port1, self.port2, self.port3, self.port4, self.port5, self.port6, self.port7] self.ingress_port = self.dev_port0 self.egress_port = self.dev_port1 self.ihl_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_ihl=1) self.src_ip_mc_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_src=self.mc_ip) self.smac_equals_dmac_pkt = simple_tcp_packet( eth_dst=self.default_mac_1, eth_src=self.default_mac_1) self.ttl_zero_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.neighbor_ip, ip_ttl=0) self.mc_smac_pkt = simple_tcp_packet( eth_dst=self.default_mac_1, eth_src=self.mc_mac_1) self.zero_smac_pkt = simple_tcp_packet( eth_dst=self.default_mac_1, eth_src='00:00:00:00:00:00') self.reserved_dmac_pkt = simple_tcp_packet( eth_dst='01:80:C2:00:00:00', eth_src=self.default_mac_1,) self.vlan_discard_pkt = simple_tcp_packet( eth_dst=self.default_mac_2, eth_src=self.default_mac_1, vlan_vid=42, dl_vlan_enable=True) self.src_ip_class_e_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_src=self.class_e_ip) self.ip_dst_loopback_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.loopback_ip) self.ip_src_loopback_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_src=self.loopback_ip) self.dst_ip_link_local_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.link_local_ip) self.src_ip_link_local_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.neighbor_ip, ip_src=self.link_local_ip) self.dst_ipv4_unspec_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.unspec_ipv4) self.dst_ipv6_unspec_pkt = simple_tcpv6_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ipv6_dst=self.unspec_ipv6) self.src_ipv4_unspec_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.neighbor_ip, ip_src=self.unspec_ipv4) self.uc_dipv4_mc_dmac_pkt = simple_tcp_packet( eth_dst=self.mc_mac_1, eth_src=self.default_mac_1) self.uc_dipv6_mc_dmac_pkt = simple_tcpv6_packet( eth_dst=self.mc_mac_2, eth_src=self.default_mac_1) self.lpm4_miss_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.unknown_neighbor_ipv4) self.lpm6_miss_pkt = simple_tcpv6_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ipv6_dst=self.unknown_neighbor_ipv6) self.blackhole_route_pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.blackhole_ip) self.map_drop_reasons = [] self.map_drop_reasons.append( [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER, [self.vlan_discard_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_IP_HEADER_ERROR, [self.ihl_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_SIP_MC, [self.src_ip_mc_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_SMAC_MULTICAST, [self.mc_smac_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_TTL, [self.ttl_zero_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_LPM4_MISS, [self.lpm4_miss_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_LPM6_MISS, [self.lpm6_miss_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_BLACKHOLE_ROUTE, [self.blackhole_route_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_L3_ANY, [self.ttl_zero_pkt, self.ihl_pkt, self.src_ip_mc_pkt, self.ip_dst_loopback_pkt, self.ip_src_loopback_pkt, self.src_ip_class_e_pkt, self.lpm4_miss_pkt, self.lpm6_miss_pkt, self.blackhole_route_pkt, self.dst_ipv4_unspec_pkt, self.dst_ipv6_unspec_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_SIP_CLASS_E, [self.src_ip_class_e_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_SIP_UNSPECIFIED, [self.src_ipv4_unspec_pkt]]) if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): self.map_drop_reasons.append( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC, [self.smac_equals_dmac_pkt]]) self.map_drop_reasons.append( [SAI_IN_DROP_REASON_L2_ANY, [self.mc_smac_pkt, self.zero_smac_pkt, self.vlan_discard_pkt]]) else: self.map_drop_reasons.append( [SAI_IN_DROP_REASON_L2_ANY, [self.mc_smac_pkt, self.vlan_discard_pkt]]) def setDebugCounterDropReasons(self, dc_oid, in_drop_reasons): ''' Sets the drop reason list for a given DebugCounter oid. Args: dc_oid (sai_object_id_t): DebugCounter object id in_drop_reasons (list): drop reason list ''' if not isinstance(in_drop_reasons, list): in_drop_reasons = [in_drop_reasons] in_drop_reason_list = sai_thrift_u32_list_t( count=len(in_drop_reasons), uint32list=in_drop_reasons) sai_thrift_set_debug_counter_attribute( self.client, dc_oid, in_drop_reason_list=in_drop_reason_list) def createDebugCounter(self, dc_type, drop_reasons): ''' Creates DebugCounter for a given dc_type with drop_reason list Args: dc_type (sai_debug_counter_type_t): debug counter type. drop_reasons (list): drop reason list Returns: dc_oid(sai_object_id_t): debug counter object id ''' if not isinstance(drop_reasons, list): drop_reasons = [drop_reasons] dc_oid = sai_thrift_create_debug_counter( self.client, type=dc_type, bind_method=None, in_drop_reason_list=sai_thrift_s32_list_t( count=len(drop_reasons), int32list=drop_reasons), out_drop_reason_list=None) if dc_oid: self.assertTrue(self.verifyDebugCounterDropList( dc_oid, drop_reasons), "Failed to verify debug counter") return dc_oid def createRouter(self): ''' Creates a router ''' self.rif_ids.append(sai_thrift_create_router_interface( self.client, virtual_router_id=self.default_vrf, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=self.port0, admin_v4_state=self.v4_enabled, admin_v6_state=self.v6_enabled)) self.rif_ids.append(sai_thrift_create_router_interface( self.client, virtual_router_id=self.default_vrf, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=self.port1, admin_v4_state=self.v4_enabled, admin_v6_state=self.v6_enabled)) def setRifAttribute(self, rif_id, attr_id, attr_value): ''' Sets rif attribure Args: rif_id (sai_object_id_t): rif object id attr_id (sai_attribute_t): attribute id attr_value (sai_attribute_value_t): attribute value ''' rif_attribute_value = None if attr_id == SAI_ROUTER_INTERFACE_ATTR_MTU: rif_attribute_value = sai_thrift_attribute_value_t(u32=attr_value) elif attr_id == SAI_ROUTER_INTERFACE_ATTR_ADMIN_V4_STATE: rif_attribute_value = sai_thrift_attribute_value_t( booldata=attr_value) elif attr_id == SAI_ROUTER_INTERFACE_ATTR_LOOPBACK_PACKET_ACTION: rif_attribute_value = sai_thrift_attribute_value_t(s32=attr_value) else: return rif_attribute = sai_thrift_attribute_t(id=attr_id, value=rif_attribute_value) self.client.sai_thrift_set_router_interface_attribute( rif_id, rif_attribute) def createRoute(self, packet_action=None, nbr_ip_pfx=neighbor_ip_prefix): ''' Creates a single route Args: packet_action (enum): route packer action nbr_ip_pfx (str): neighbor IP prefix ''' route_entry = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(nbr_ip_pfx)) status = sai_thrift_create_route_entry( self.client, route_entry, packet_action=packet_action) self.assertEqual(status, SAI_STATUS_SUCCESS, "Failed to create route") self.routes.append(route_entry) def createRouteV6(self): ''' Creates a single IPv6 route. ''' route_entry = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.neighbor_ipv6_prefix)) status = sai_thrift_create_route_entry(self.client, route_entry) self.assertEqual(status, SAI_STATUS_SUCCESS, "Failed to create route") self.routes.append(route_entry) def createNeighbor(self, nbr_ip=neighbor_ip): ''' Creates a neighbor entry. Args: nbr_ip (str): neighbor IP address ''' neighbor_entry1 = sai_thrift_neighbor_entry_t( rif_id=self.rif_ids[-1], ip_address=sai_ipaddress(nbr_ip)) self.neighbors.append(neighbor_entry1) sai_thrift_create_neighbor_entry( self.client, neighbor_entry1, dst_mac_address=self.neighbor_mac) def createNeighborV6(self): ''' Creates an IPv6 neighbor entry. ''' neighbor_entry1 = sai_thrift_neighbor_entry_t( rif_id=self.rif_ids[-1], ip_address=sai_ipaddress(self.neighbor_ipv6)) self.neighbors.append(neighbor_entry1) sai_thrift_create_neighbor_entry( self.client, neighbor_entry1, dst_mac_address=self.neighbor_mac_v6) def cleanup(self): ''' Cleanups the class parameters. ''' self.ingress_port = 0 self.egress_port = 1 del self.fdbs[:] del self.vr_ids[:] del self.rif_ids[:] del self.neighbors[:] del self.routes[:] self.dc_oid = None self.second_dc_oid = None def isInDropReasonSupported(self, drop_reason_list): ''' Verifies if drop reasons from the list are supported. Args: drop_reason_list (list): list of in drop reasons Returns: boolean: True if supported ''' # get the supported IN drop reason capabilities caps_list = sai_thrift_query_attribute_enum_values_capability( self.client, SAI_OBJECT_TYPE_DEBUG_COUNTER, SAI_DEBUG_COUNTER_ATTR_IN_DROP_REASON_LIST) for drop_reason in drop_reason_list: found = False for in_drop_reason in caps_list: if drop_reason == in_drop_reason: found = True if found is False: return False return True def tearDown(self): if self.dc_oid is not None: sai_thrift_remove_debug_counter(self.client, self.dc_oid) if self.second_dc_oid is not None: sai_thrift_remove_debug_counter(self.client, self.second_dc_oid) for route_data in self.routes: sai_thrift_remove_route_entry(self.client, route_data) for neighbor_data in self.neighbors: sai_thrift_remove_neighbor_entry(self.client, neighbor_data) for rif in self.rif_ids: sai_thrift_remove_router_interface(self.client, rif) for vr_id in self.vr_ids: self.client.sai_thrift_remove_virtual_router(vr_id) self.cleanup() super(BaseDebugCounterClass, self).tearDown() def testPortDebugCounter(self, port, dc_oid, in_drop_reasons, pkts, drop_expected=True): ''' Executes port debug counter test. Args: port (sai_object_id_t): port object id dc_oid (sai_object_id_t): debugc counter object id in_drop_reasons (list): in drop reasons list pkts (list): list of test packets drop_expected (bool): if test drops expected ''' if not isinstance(pkts, list): pkts = [pkts] if not isinstance(dc_oid, list): dc_oid = [dc_oid] print("Test port debug counter for drop_reasons=%s" % (map_drop_reason_to_string(in_drop_reasons))) dc_counter_before = self.getDebugDounterPortStatsByOid(port, dc_oid) stats = get_port_stats(port, SAI_PORT_STAT_IF_IN_DISCARDS) port_drop_counter_before = stats[SAI_PORT_STAT_IF_IN_DISCARDS] packets_requested = len(pkts) expected_drops = 0 print("Request %d packet(s)" % (len(pkts))) for pkt in pkts: send_packet(self, self.ingress_port, pkt) if drop_expected: verify_no_packet(self, pkt, self.egress_port) if drop_expected: expected_drops = packets_requested dc_counter_after = self.getDebugDounterPortStatsByOid(port, dc_oid) stats = get_port_stats(port, SAI_PORT_STAT_IF_IN_DISCARDS) port_drop_counter_after = stats[SAI_PORT_STAT_IF_IN_DISCARDS] print("Debug Counter before test=%d, DC counter after test=%d " "packets requested=%d expected_drops =%d" % (dc_counter_before, dc_counter_after, packets_requested, expected_drops)) print("Port drop counter before=%d, Port drop counter after=%d, " "packets requested=%d" % (port_drop_counter_before, port_drop_counter_after, packets_requested)) self.assertEqual(dc_counter_after, dc_counter_before + expected_drops) if drop_expected: self.assertEqual(port_drop_counter_after, port_drop_counter_before + expected_drops) print("\tok") def testSwitchDebugCounter(self, dc_oid, in_drop_reasons, pkts, drop_expected=True): ''' Executes switch debug counter test. Args: dc_oid (sai_object_id_t): debugc counter object id in_drop_reasons (list): in drop reasons list pkts (list): list of test packets drop_expected (bool): if test drops expected ''' if not isinstance(pkts, list): pkts = [pkts] if not isinstance(dc_oid, list): dc_oid = [dc_oid] print("Test switch debug counter for drop_reasons=%s" % (map_drop_reason_to_string(in_drop_reasons))) dc_counter_before = self.getDebugCounterSwitchStatsByOid(dc_oid) print("dc counter_before=%d" % dc_counter_before) stats = get_switch_stats(SAI_PORT_STAT_IF_IN_DISCARDS) switch_counter_before = stats[SAI_PORT_STAT_IF_IN_DISCARDS] packets_requested = 0 expected_drops = 0 try: for test_port in self.test_dev_ports: for pkt in pkts: packets_requested += 1 send_packet(self, test_port, pkt) if drop_expected: verify_no_packet(self, pkt, self.egress_port) if drop_expected: expected_drops = packets_requested finally: dc_counter_after = self.getDebugCounterSwitchStatsByOid(dc_oid) stats = get_switch_stats(SAI_PORT_STAT_IF_IN_DISCARDS) switch_counter_after = stats[SAI_PORT_STAT_IF_IN_DISCARDS] print("Debug Counter before test=%d, DC counter after test=%d " "packets requested=%d, drop_expected =%d" % (dc_counter_before, dc_counter_after, packets_requested, drop_expected)) print("Switch drop counter before=%d, " "Switch drop counter after=%d, packets requested=%d" % (switch_counter_before, switch_counter_after, packets_requested)) self.assertEqual(dc_counter_after, dc_counter_before + expected_drops) if drop_expected: self.assertEqual(switch_counter_after, switch_counter_before + expected_drops) def verifyPortDebugCounterDropPackets( self, port, dc_oid, drop_reason_list): ''' Verifies the port debug counters packets drops for a given drop reason list. Args: port (sai_object_id_t): port object id dc_oid (sai_object_id_t): debugc counter object id drop_reason_list (list): in drop reasons list Returns: boolean: if succesfully verified ''' dc_pkt_cnt = 0 found_pkt = False print("Verify drop reasons = %s" % (map_drop_reason_to_string(drop_reason_list))) counter = self.getDebugDounterPortStatsByOid( port, dc_oid) for dc_drop_reason in drop_reason_list: pkts = [] for map_drop_reason, list_of_packets in self.map_drop_reasons: if map_drop_reason == dc_drop_reason: # found corresponding test packet(s) found_pkt = True pkts = list_of_packets break if found_pkt: dc_pkt_cnt += len(pkts) print("Sending %d packets for %s" % (len(pkts), map_drop_reason_to_string(dc_drop_reason))) for pkt in pkts: send_packet(self, self.ingress_port, pkt) verify_no_packet(self, pkt, self.egress_port) else: # skip check as there is no packet sent continue counter_after = self.getDebugDounterPortStatsByOid( port, dc_oid) print("counter=%d, counter_after=%d, dc_pkt_cnt=%d" % (counter, counter_after, dc_pkt_cnt)) if (counter + dc_pkt_cnt) != counter_after: return False return True def verifySwitchDebugCounterDropPackets(self, dc_oid, drop_reason_list): ''' Verifies the switch debug counters packets drops for a given drop reason list. Args: dc_oid (sai_object_id_t): debugc counter object id drop_reason_list (list): in drop reasons list Returns: boolean: if succesfully verified ''' dc_pkt_cnt = 0 found_pkt = False print("Verify drop reasons = %s" % (map_drop_reason_to_string(drop_reason_list))) counter = self.getDebugCounterSwitchStatsByOid(dc_oid) for dc_drop_reason in drop_reason_list: pkts = [] for map_drop_reason, list_of_packets in self.map_drop_reasons: if map_drop_reason == dc_drop_reason: # Found corresponding test packet(s) found_pkt = True pkts = list_of_packets break if found_pkt: print("Sending %d packets for %s" % (len(pkts), map_drop_reason_to_string(dc_drop_reason))) for port in self.test_dev_ports: dc_pkt_cnt += len(pkts) for pkt in pkts: send_packet(self, port, pkt) verify_no_packet(self, pkt, self.egress_port) else: # Skip check as there is no packet sent continue counter_after = self.getDebugCounterSwitchStatsByOid(dc_oid) print("counter=%d, counter_after=%d, dc_pkt_cnt=%d" % (counter, counter_after, dc_pkt_cnt)) if (counter + dc_pkt_cnt) != counter_after: return False return True def verifyDebugCounterDropList(self, dc_oid, drop_reasons): ''' Verifies the debug counters drop list for given debug counter. Args: dc_oid (sai_object_id_t): debugc counter object id drop_reasons (list): in drop reasons list Returns: boolean: if successfully verified ''' attr = sai_thrift_get_debug_counter_attribute( self.client, dc_oid, in_drop_reason_list=sai_thrift_u32_list_t(count=10, uint32list=[])) if len(drop_reasons) != attr['in_drop_reason_list'].count: print( "SAI_DEBUG_COUNTER_ATTR_IN_DROP_REASON_LIST incorrect count=%d" % attr['in_drop_reason_list'].count) return False # Verify if IN list is correct found = False for drop_reason in drop_reasons: found = False for i in range(0, attr['in_drop_reason_list'].count): if attr['in_drop_reason_list'].int32list[i] == drop_reason: found = True if found is not True: return False return True def getDebugCounterSwitchStatsByOid(self, dc_oid): ''' Returns the switch stats for given DebugCounter. Args: dc_oid (sai_object_id_t): debugc counter object id Returns: uint: debug counter stats value. ''' if not isinstance(dc_oid, list): dc_oid = [dc_oid] counters = [] for debug_counter in dc_oid: attr = sai_thrift_get_debug_counter_attribute( self.client, debug_counter, index=True, type=True) dc_index = attr['index'] + \ debug_counter_type_to_index_base(attr['type']) counters.append(dc_index) stats = sai_thrift_get_debug_counter_switch_stats(counters) return stats[dc_index] def getDebugDounterPortStatsByOid(self, port, dc_oid): ''' Returns the port stats for given DebugCounter. Args: port (sai_object_id_t): port object id dc_oid (sai_object_id_t): debugc counter object id Returns: uint: debug counter stats value. ''' if not isinstance(dc_oid, list): dc_oid = [dc_oid] counters = [] for debug_counter in dc_oid: attr = sai_thrift_get_debug_counter_attribute( self.client, debug_counter, index=True, type=True) dc_index = attr['index'] + \ debug_counter_type_to_index_base(attr['type']) counters.append(dc_index) stats = sai_thrift_get_debug_counter_port_stats(port, counters) # Returns the stats for the first DebugCounter in the list return stats[dc_index] @group("draft") class PortDebugCounterRemoveDropReason(BaseDebugCounterClass): ''' Port debug counter - remove drop reasons test. ''' def runTest(self): port_dc_oid = 0 try: drop_reason_cap = [ SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER, SAI_IN_DROP_REASON_L2_ANY, SAI_IN_DROP_REASON_L3_ANY, SAI_IN_DROP_REASON_SMAC_MULTICAST, SAI_IN_DROP_REASON_TTL, SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_SIP_MC] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): drop_reason_cap.append(SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC) drop_reason_list = [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER] port_dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_reason_list) self.assertTrue(port_dc_oid != 0, "Failed to Create DebugCounter") self.setDebugCounterDropReasons(port_dc_oid, []) self.assertTrue(self.verifyDebugCounterDropList( port_dc_oid, [])) drop_reason_list = drop_reason_cap self.setDebugCounterDropReasons(port_dc_oid, drop_reason_list) self.assertTrue(self.verifyDebugCounterDropList( port_dc_oid, drop_reason_list)) for i in range(0, len(drop_reason_cap)): if i == 0: print("Verify initial drop_reason_list with traffic: %s" % (map_drop_reason_to_string(drop_reason_list))) self.assertTrue(self.verifyPortDebugCounterDropPackets( self.port0, port_dc_oid, drop_reason_list)) # Remove drop reason drop_reason_list.pop(0) print("Setting DebugCounter drop_reason_list to: %s" % (map_drop_reason_to_string(drop_reason_list))) self.setDebugCounterDropReasons( port_dc_oid, drop_reason_list) print("Verify updated drop_reason_list") self.assertTrue(self.verifyDebugCounterDropList( port_dc_oid, drop_reason_list)) print("\tok") print("Verify updated drop_reason_list with traffic") # Verify packets after removing the drop_reason self.assertTrue(self.verifyPortDebugCounterDropPackets( self.port0, port_dc_oid, drop_reason_list)) print("\tok") finally: if port_dc_oid != 0: sai_thrift_remove_debug_counter(self.client, port_dc_oid) @group("draft") class SwitchDebugCounterRemoveDropReason(BaseDebugCounterClass): ''' Switch Debug Counter Remove drop reasons test. ''' def runTest(self): dc_oid = 0 try: drop_reason_cap = [ SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER, SAI_IN_DROP_REASON_L2_ANY, SAI_IN_DROP_REASON_L3_ANY, SAI_IN_DROP_REASON_SMAC_MULTICAST, SAI_IN_DROP_REASON_TTL, SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_SIP_MC] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): drop_reason_cap.append(SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC) drop_reason_list = [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_reason_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.setDebugCounterDropReasons(dc_oid, []) self.assertTrue(self.verifyDebugCounterDropList( dc_oid, [])) drop_reason_list = drop_reason_cap self.setDebugCounterDropReasons(dc_oid, drop_reason_list) self.assertTrue(self.verifyDebugCounterDropList( dc_oid, drop_reason_list)) for i in range(0, len(drop_reason_cap)): if i == 0: print( "Verify initial drop_reason_list with traffic: %s" % (map_drop_reason_to_string(drop_reason_list))) self.assertTrue(self.verifySwitchDebugCounterDropPackets( dc_oid, drop_reason_list)) # Remove drop reason drop_reason_list.pop(0) print("Setting DebugCounter drop_reason_list to: %s" % (map_drop_reason_to_string(drop_reason_list))) self.setDebugCounterDropReasons(dc_oid, drop_reason_list) print("Verify updated drop_reason_list") self.assertTrue(self.verifyDebugCounterDropList( dc_oid, drop_reason_list)) print("\tok") print("Verify updated drop_reason_list with traffic") # Verify packets after removing the drop_reason self.assertTrue(self.verifySwitchDebugCounterDropPackets( dc_oid, drop_reason_list)) print("\tok") finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDebugCounterAddDropReason(BaseDebugCounterClass): ''' Port Debug Counter add drop reasons test. ''' def runTest(self): port_dc_oid = 0 try: port_dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER]) self.assertTrue(port_dc_oid != 0, "Failed to Create DebugCounter") drop_reason_cap = [ SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER, SAI_IN_DROP_REASON_L2_ANY, SAI_IN_DROP_REASON_L3_ANY, SAI_IN_DROP_REASON_SMAC_MULTICAST, SAI_IN_DROP_REASON_TTL, SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_SIP_MC] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): drop_reason_cap.append(SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC) drop_reason_list = [] for drop_reason in drop_reason_cap: drop_reason_list.append(drop_reason) print("Setting DebugCounter drop_reason_list to: %s" % (map_drop_reason_to_string(drop_reason_list))) self.setDebugCounterDropReasons( port_dc_oid, drop_reason_list) print("\tok") print("Verify updated drop_reason_list") self.assertTrue(self.verifyDebugCounterDropList( port_dc_oid, drop_reason_list)) print("\tok") print("Verify updated drop_reason_list with traffic") self.assertTrue(self.verifyPortDebugCounterDropPackets( self.port0, port_dc_oid, drop_reason_list)) print("\tok") finally: if port_dc_oid != 0: sai_thrift_remove_debug_counter(self.client, port_dc_oid) @group("draft") class SwitchDebugCounterAddDropReason(BaseDebugCounterClass): ''' Switch Debug Counter add drop reasons test. ''' def runTest(self): dc_oid = 0 try: dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER]) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") drop_reason_cap = [ SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER, SAI_IN_DROP_REASON_L2_ANY, SAI_IN_DROP_REASON_L3_ANY, SAI_IN_DROP_REASON_SMAC_MULTICAST, SAI_IN_DROP_REASON_TTL, SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_SIP_MC, SAI_IN_DROP_REASON_SIP_CLASS_E] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): drop_reason_cap.append(SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC) drop_reason_list = [] for drop_reason in drop_reason_cap: drop_reason_list.append(drop_reason) print("Setting DebugCounter drop_reason_list to: %s" % (map_drop_reason_to_string(drop_reason_list))) self.setDebugCounterDropReasons(dc_oid, drop_reason_list) print("\tok") print("Verify updated drop_reason_list") self.assertTrue(self.verifyDebugCounterDropList( dc_oid, drop_reason_list)) print("\tok") print("Verify updated drop_reason_list with traffic") self.assertTrue(self.verifySwitchDebugCounterDropPackets( dc_oid, drop_reason_list)) print("\tok") finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropMCSMAC(BaseDebugCounterClass): ''' Port Debug Counter for SMAC Multicast test. ''' def runTest(self): dc_oid = 0 try: dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, [SAI_IN_DROP_REASON_SMAC_MULTICAST]) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, [SAI_IN_DROP_REASON_SMAC_MULTICAST], [self.mc_smac_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) class SwitchDropMCSMAC(BaseDebugCounterClass): ''' Switch Debug Counter for SMAC Multicast test. ''' def runTest(self): dc_oid = 0 try: dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, [SAI_IN_DROP_REASON_SMAC_MULTICAST]) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, [SAI_IN_DROP_REASON_SMAC_MULTICAST], [self.mc_smac_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropL2Any(BaseDebugCounterClass): ''' Port Debug Counter for L2 any drop reason test. ''' def runTest(self): pkt_list = [self.zero_smac_pkt, self.vlan_discard_pkt] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): pkt_list.append(self.mc_smac_pkt) dc_oid = 0 try: dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, [SAI_IN_DROP_REASON_L2_ANY]) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, [SAI_IN_DROP_REASON_L2_ANY], pkt_list) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropL2Any(BaseDebugCounterClass): ''' Switch Debug Counter for L2 any drop reason test. ''' def runTest(self): drop_list = [SAI_IN_DROP_REASON_L2_ANY] pkt_list = [self.zero_smac_pkt, self.vlan_discard_pkt] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): pkt_list.append(self.mc_smac_pkt) dc_oid = 0 try: dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter(dc_oid, drop_list, pkt_list) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropSMACequalsDMAC(BaseDebugCounterClass): ''' Port Debug Counter for SMAC equals DMAC drop reason test. ''' def runTest(self): dc_oid = 0 try: drop_list = [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC], [self.smac_equals_dmac_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropSMACequalsDMAC(BaseDebugCounterClass): ''' Switch Debug Counter for SMAC equals DMAC drop reason test. ''' def runTest(self): dc_oid = 0 try: drop_list = [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC] if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, drop_list, [self.smac_equals_dmac_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIngressVLANFilter(BaseDebugCounterClass): ''' Port Debug Counter for Ingress VLAN filter drop reason test. ''' def runTest(self): dc_oid = 0 try: drop_list = [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER], [self.vlan_discard_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropIngressVLANFilter(BaseDebugCounterClass): ''' Switch Debug Counter for Ingress VLAN filter drop reason test. ''' def runTest(self): dc_oid = 0 try: drop_list = [SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, drop_list, [self.vlan_discard_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropSIPMCTest(BaseDebugCounterClass): ''' Port Debug Counter for source IP Multicast drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_SIP_MC] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.src_ip_mc_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropSIPMCTest(BaseDebugCounterClass): ''' Switch Debug Counter for source IP Multicast drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_SIP_MC] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, drop_list, [self.src_ip_mc_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropReasonTTLTest(BaseDebugCounterClass): ''' Port Debug Counter for TTL zero drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_TTL] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.ttl_zero_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropReasonTTLTest(BaseDebugCounterClass): ''' Switch Debug Counter for TTL zero drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_TTL] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, drop_list, [self.ttl_zero_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropSIPClassETest(BaseDebugCounterClass): ''' Port Debug Counter for SIP Class E drop reason test. ''' def runTest(self): dc_oid = 0 for test_port in self.test_ports: self.rif_ids.append(sai_thrift_create_router_interface( self.client, virtual_router_id=self.default_vrf, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=test_port, admin_v4_state=self.v4_enabled, admin_v6_state=self.v6_enabled)) try: drop_list = [SAI_IN_DROP_REASON_SIP_CLASS_E] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.src_ip_class_e_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropSIPClassETest(BaseDebugCounterClass): ''' Switch Debug Counter for SIP Class E drop reason test. ''' def runTest(self): dc_oid = 0 for test_port in self.test_ports: self.rif_ids.append(sai_thrift_create_router_interface( self.client, virtual_router_id=self.default_vrf, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=test_port, admin_v4_state=self.v4_enabled, admin_v6_state=self.v6_enabled)) try: drop_list = [SAI_IN_DROP_REASON_SIP_CLASS_E] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, drop_list, [self.src_ip_class_e_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropUCDIPMCDMACTest(BaseDebugCounterClass): ''' Port Debug Counter for UC DIP with MC DMAC drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() self.createNeighbor() self.createRoute() try: drop_list = [SAI_IN_DROP_REASON_UC_DIP_MC_DMAC] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") print("\nTest IPv4") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.uc_dipv4_mc_dmac_pkt]) print("Test IPv6") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.uc_dipv6_mc_dmac_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropReasonIPHeaderErrorTest(BaseDebugCounterClass): ''' Port Debug Counter for IP header error drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_IP_HEADER_ERROR] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.ihl_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropReasonIPHeaderErrorTest(BaseDebugCounterClass): ''' Switch Debug Counter for IP header error drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_IP_HEADER_ERROR] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testSwitchDebugCounter( dc_oid, drop_list, [self.ihl_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortMultiDebugCounters(BaseDebugCounterClass): ''' Port multi Debug Counter test. ''' def runTest(self): dc_oid1 = 0 dc_oid2 = 0 dc_oid3 = 0 try: drop_list1 = [SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_L2_ANY] dc_oid1 = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list1) self.assertTrue(dc_oid1 != 0, "Failed to Create DebugCounter") self.assertTrue(self.verifyDebugCounterDropList( dc_oid1, drop_list1)) drop_list2 = [SAI_IN_DROP_REASON_L2_ANY] dc_oid2 = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list2) self.assertTrue(dc_oid2 != 0, "Failed to Create DebugCounter") self.assertTrue(self.verifyDebugCounterDropList( dc_oid2, drop_list2)) if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): drop_list3 = [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC] dc_oid3 = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list3) self.assertTrue(dc_oid3 != 0, "Failed to Create DebugCounter") self.assertTrue(self.verifyDebugCounterDropList( dc_oid3, drop_list3)) self.testPortDebugCounter( self.port0, dc_oid3, drop_list3, [self.smac_equals_dmac_pkt]) self.testPortDebugCounter( self.port0, dc_oid1, drop_list2, [self.zero_smac_pkt]) else: self.testPortDebugCounter( self.port0, dc_oid1, drop_list2, [self.vlan_discard_pkt, self.zero_smac_pkt]) finally: if dc_oid1 != 0: sai_thrift_remove_debug_counter(self.client, dc_oid1) if dc_oid2 != 0: sai_thrift_remove_debug_counter(self.client, dc_oid2) if dc_oid3 != 0: sai_thrift_remove_debug_counter(self.client, dc_oid3) @group("draft") class SwitchMultiDebugCounters(BaseDebugCounterClass): ''' Switch multi Debug Counter test. ''' def runTest(self): dc_oid1 = 0 dc_oid2 = 0 dc_oid3 = 0 try: drop_list1 = [SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_L2_ANY] dc_oid1 = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list1) self.assertTrue(dc_oid1 != 0, "Failed to Create DebugCounter") self.assertTrue(self.verifyDebugCounterDropList( dc_oid1, drop_list1)) drop_list2 = [SAI_IN_DROP_REASON_L2_ANY] dc_oid2 = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list2) self.assertTrue(dc_oid2 != 0, "Failed to Create DebugCounter") self.assertTrue(self.verifyDebugCounterDropList( dc_oid2, drop_list2)) if (self.isInDropReasonSupported( [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC])): drop_list3 = [SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC] dc_oid3 = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, drop_list3) self.assertTrue(dc_oid3 != 0, "Failed to Create DebugCounter") self.assertTrue(self.verifyDebugCounterDropList( dc_oid3, drop_list3)) self.testSwitchDebugCounter( dc_oid3, drop_list3, [self.smac_equals_dmac_pkt]) self.testSwitchDebugCounter( dc_oid1, drop_list2, [self.vlan_discard_pkt, self.zero_smac_pkt, self.ihl_pkt]) else: self.testSwitchDebugCounter( dc_oid1, drop_list2, [self.vlan_discard_pkt, self.zero_smac_pkt, self.ihl_pkt]) self.testSwitchDebugCounter( dc_oid1, drop_list2, [self.vlan_discard_pkt, self.zero_smac_pkt]) finally: if dc_oid1 != 0: sai_thrift_remove_debug_counter(self.client, dc_oid1) if dc_oid2 != 0: sai_thrift_remove_debug_counter(self.client, dc_oid2) if dc_oid3 != 0: sai_thrift_remove_debug_counter(self.client, dc_oid3) @group("draft") class PortDropDIPLinkLocalTest(BaseDebugCounterClass): ''' Port Debug Counter for DIP Link Local drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() self.createNeighbor(nbr_ip=self.link_local_ip) self.createRoute(nbr_ip_pfx=self.link_local_ip_prefix) try: drop_list = [SAI_IN_DROP_REASON_DIP_LINK_LOCAL] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.dst_ip_link_local_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropSIPLinkLocalTest(BaseDebugCounterClass): ''' Port Debug Counter for SIP Link Local drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() self.createNeighbor() self.createRoute() try: drop_list = [SAI_IN_DROP_REASON_SIP_LINK_LOCAL] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.src_ip_link_local_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropSIPUnspecifiedTest(BaseDebugCounterClass): ''' Port Debug Counter for SIP Unspecified drop reason test. ''' def runTest(self): dc_oid = 0 self.createRouter() self.createNeighbor() self.createRoute() try: drop_list = [SAI_IN_DROP_REASON_SIP_UNSPECIFIED] dc_oid = self.createDebugCounter( SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.src_ipv4_unspec_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIPv4RIFDisabled(BaseDebugCounterClass): ''' Port Debug Counter for IPv4 disabled rif drop reason test. ''' def runTest(self): dc_oid = 0 try: self.createRouter() self.createNeighbor() self.createRoute() drop_list = [SAI_IN_DROP_REASON_IRIF_DISABLED] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.neighbor_ip) # Interface enabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v4_state=True) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt], drop_expected=False) # Interface disabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v4_state=False) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIPv4L3AnyRIFDisabled(BaseDebugCounterClass): ''' Port Debug Counter for IPv4 disabled rif and L3 any drop reasons test. ''' def runTest(self): dc_oid = 0 try: self.createRouter() self.createNeighbor() self.createRoute() drop_list = [SAI_IN_DROP_REASON_L3_ANY] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.neighbor_ip) # Interface enabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v4_state=True) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt], drop_expected=False) # Interface disabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v4_state=False) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIPv6RIFDisabled(BaseDebugCounterClass): ''' Port Debug Counter for IPv6 disabled RIF drop reason test. ''' def runTest(self): dc_oid = 0 try: drop_list = [SAI_IN_DROP_REASON_IRIF_DISABLED] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS self.createRouter() self.createNeighborV6() self.createRouteV6() pkt = simple_tcpv6_packet(eth_dst=ROUTER_MAC, ipv6_dst=self.neighbor_ipv6) dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") # Verify traffic for rif interface enabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v6_state=True) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt], drop_expected=False) # Verify traffic for rif interface disabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v6_state=False) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIPv6L3AnyRIFDisabled(BaseDebugCounterClass): ''' Port Debug Counter for IPv6 disabled rif and L3 any drop reasons test. ''' def runTest(self): dc_oid = 0 try: drop_list = [SAI_IN_DROP_REASON_L3_ANY] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS self.createRouter() self.createNeighborV6() self.createRouteV6() pkt = simple_tcpv6_packet(eth_dst=ROUTER_MAC, ipv6_dst=self.neighbor_ipv6, ipv6_hlim=64) dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") # Verify traffic for rif interface enabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v6_state=True) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt], drop_expected=False) # Verify traffic for rif interface disabled sai_thrift_set_router_interface_attribute( self.client, self.rif_ids[0], admin_v6_state=False) self.testPortDebugCounter(self.port0, dc_oid, drop_list, [pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropDIPLoopback(BaseDebugCounterClass): ''' Switch Debug Counter for DIP loopback drop reasons test. ''' def runTest(self): dc_oid = 0 for test_port in self.test_ports: self.rif_ids.append(sai_thrift_create_router_interface( self.client, virtual_router_id=self.default_vrf, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=test_port, admin_v4_state=self.v4_enabled, admin_v6_state=self.v6_enabled)) try: drop_list = [SAI_IN_DROP_REASON_DIP_LOOPBACK] dc_type = SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.loopback_ip) dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") # Verify traffic for rif interface enabled self.testSwitchDebugCounter(dc_oid, drop_list, pkt) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class SwitchDropSIPLoopback(BaseDebugCounterClass): ''' Switch Debug Counter for SIP loopback drop reasons test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_SIP_LOOPBACK] dc_type = SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_src=self.loopback_ip) dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") # Verify traffic for rif interface enabled self.testSwitchDebugCounter(dc_oid, drop_list, pkt) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIPv4Miss(BaseDebugCounterClass): ''' Port Debug Counter for IPv4 route miss drop reasons test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_LPM4_MISS] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.lpm4_miss_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropIPv6Miss(BaseDebugCounterClass): ''' Port Debug Counter for IPv6 route miss drop reasons test. ''' def runTest(self): dc_oid = 0 self.createRouter() try: drop_list = [SAI_IN_DROP_REASON_LPM6_MISS] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.lpm6_miss_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropBlackHoleRoute(BaseDebugCounterClass): ''' Port Debug Counter for blackhole route drop reasons test. ''' def runTest(self): dc_oid = 0 try: self.createRouter() self.createRoute( packet_action=SAI_PACKET_ACTION_DROP, nbr_ip_pfx=self.blackhole_ip) drop_list = [SAI_IN_DROP_REASON_BLACKHOLE_ROUTE] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") self.testPortDebugCounter( self.port0, dc_oid, drop_list, [self.blackhole_route_pkt]) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropL3AnyTest(BaseDebugCounterClass): ''' Port Debug Counter for L3 any drop reasons test. ''' def runTest(self): dc_oid = 0 try: self.createRouter() self.createNeighborV6() self.createRouteV6() self.createRoute( packet_action=SAI_PACKET_ACTION_DROP, nbr_ip_pfx=self.blackhole_ip) drop_list = [SAI_IN_DROP_REASON_L3_ANY] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") pkt_list = [self.ihl_pkt, self.src_ip_mc_pkt, self.ttl_zero_pkt, self.ip_dst_loopback_pkt, self.ip_src_loopback_pkt, self.src_ip_class_e_pkt, self.lpm4_miss_pkt, self.lpm6_miss_pkt, self.blackhole_route_pkt, self.dst_ipv4_unspec_pkt, self.dst_ipv6_unspec_pkt] self.testPortDebugCounter(self.port0, dc_oid, drop_list, pkt_list) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) @group("draft") class PortDropAclAnyTest(BaseDebugCounterClass): ''' Port Debug Counter for ACL any drop reason test. ''' def runTest(self): dc_oid = 0 acl_list = [] try: self.createRouter() self.createNeighborV6() self.createRouteV6() drop_list = [SAI_IN_DROP_REASON_ACL_ANY] dc_type = SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS dc_oid = self.createDebugCounter(dc_type, drop_list) self.assertTrue(dc_oid != 0, "Failed to Create DebugCounter") acl_list = self.setupPortIngresDropAcl(dip=self.neighbor_ip) ingress_acl = acl_list[0]['acl_table'] status = sai_thrift_set_port_attribute( self.client, self.port0, ingress_acl=ingress_acl) self.assertEqual(status, SAI_STATUS_SUCCESS) pkt = simple_tcp_packet( eth_dst=ROUTER_MAC, eth_src=self.default_mac_1, ip_dst=self.neighbor_ip) self.testPortDebugCounter(self.port0, dc_oid, drop_list, pkt) finally: if dc_oid != 0: sai_thrift_remove_debug_counter(self.client, dc_oid) sai_thrift_set_port_attribute( self.client, self.port0, ingress_acl=0) for acl in acl_list: sai_thrift_remove_acl_table_group_member( self.client, acl['acl_group_member']) sai_thrift_remove_acl_table_group(self.client, acl['acl_table_group']) sai_thrift_remove_acl_entry(self.client, acl['acl_entry']) sai_thrift_remove_acl_table(self.client, acl['acl_table']) def setupPortIngresDropAcl( self, dmac=None, dip=None, action=SAI_PACKET_ACTION_DROP, mac_mask='FF:FF:FF:FF:FF:FF'): ''' Function creates the port ingress ACL. Args: dmac (mac): acl configured dst mac address dip (ip): acl configured dst ip address action (action): acl configured action mac_mask (mac): acl configured dst mac mask Returns: list: list of acl values ''' acl = [] try: stage = SAI_ACL_STAGE_INGRESS bind_points = [SAI_ACL_BIND_POINT_TYPE_PORT] action_types = [SAI_ACL_ACTION_TYPE_PACKET_ACTION] dip_mask = '255.255.255.0' acl_bind_point_type_list = sai_thrift_s32_list_t( count=len(bind_points), int32list=bind_points) acl_action_type_list = sai_thrift_s32_list_t( count=len(action_types), int32list=action_types) if dip is not None: dip_ind = True dst_ip = sai_thrift_acl_field_data_t( data=sai_thrift_acl_field_data_data_t(ip4=dip), mask=sai_thrift_acl_field_data_mask_t(ip4=dip_mask)) else: dip_ind = None dst_ip = None if dmac is not None: dmac_ind = True dst_mac = sai_thrift_acl_field_data_t( data=sai_thrift_acl_field_data_data_t(mac=dmac), mask=sai_thrift_acl_field_data_mask_t(mac=mac_mask)) else: dmac_ind = None dst_mac = None acl_table_id = sai_thrift_create_acl_table( self.client, acl_stage=stage, acl_bind_point_type_list=acl_bind_point_type_list, acl_action_type_list=acl_action_type_list, field_dst_ip=dip_ind, field_dst_mac=dmac_ind) self.assertTrue(acl_table_id != 0, "Failed to Create ACL table") action_drop = action packet_action = sai_thrift_acl_action_data_t( parameter=sai_thrift_acl_action_parameter_t(s32=action_drop)) acl_entry = sai_thrift_create_acl_entry( self.client, table_id=acl_table_id, action_packet_action=packet_action, field_dst_ip=dst_ip, field_dst_mac=dst_mac) self.assertTrue(acl_entry != 0, "Failed to Create ACL entry") acl_table_group = sai_thrift_create_acl_table_group( self.client, acl_stage=stage, acl_bind_point_type_list=acl_bind_point_type_list, type=SAI_ACL_TABLE_GROUP_TYPE_PARALLEL) acl_group_member = sai_thrift_create_acl_table_group_member( self.client, acl_table_group_id=acl_table_group, acl_table_id=acl_table_id) finally: acl.append({ 'acl_table': acl_table_id, 'acl_entry': acl_entry, 'acl_table_group': acl_table_group, 'acl_group_member': acl_group_member}) return acl @group("draft") class GetDebugCounterEnumValuesCapabilities(BaseDebugCounterClass): ''' SAI query DebugCounter enum values capabilities. ''' def runTest(self): # Supported SAI IN drop reason list in_caps_list = [ SAI_IN_DROP_REASON_L2_ANY, SAI_IN_DROP_REASON_SMAC_MULTICAST, SAI_IN_DROP_REASON_SMAC_EQUALS_DMAC, SAI_IN_DROP_REASON_DMAC_RESERVED, SAI_IN_DROP_REASON_VLAN_TAG_NOT_ALLOWED, SAI_IN_DROP_REASON_INGRESS_VLAN_FILTER, SAI_IN_DROP_REASON_INGRESS_STP_FILTER, SAI_IN_DROP_REASON_FDB_UC_DISCARD, SAI_IN_DROP_REASON_FDB_MC_DISCARD, SAI_IN_DROP_REASON_L2_LOOPBACK_FILTER, SAI_IN_DROP_REASON_EXCEEDS_L2_MTU, SAI_IN_DROP_REASON_L3_ANY, SAI_IN_DROP_REASON_EXCEEDS_L3_MTU, SAI_IN_DROP_REASON_TTL, SAI_IN_DROP_REASON_L3_LOOPBACK_FILTER, SAI_IN_DROP_REASON_NON_ROUTABLE, SAI_IN_DROP_REASON_NO_L3_HEADER, SAI_IN_DROP_REASON_IP_HEADER_ERROR, SAI_IN_DROP_REASON_UC_DIP_MC_DMAC, SAI_IN_DROP_REASON_DIP_LOOPBACK, SAI_IN_DROP_REASON_SIP_LOOPBACK, SAI_IN_DROP_REASON_SIP_MC, SAI_IN_DROP_REASON_SIP_CLASS_E, SAI_IN_DROP_REASON_SIP_UNSPECIFIED, SAI_IN_DROP_REASON_MC_DMAC_MISMATCH, SAI_IN_DROP_REASON_SIP_EQUALS_DIP, SAI_IN_DROP_REASON_SIP_BC, SAI_IN_DROP_REASON_DIP_LOCAL, SAI_IN_DROP_REASON_DIP_LINK_LOCAL, SAI_IN_DROP_REASON_SIP_LINK_LOCAL, SAI_IN_DROP_REASON_IPV6_MC_SCOPE0, SAI_IN_DROP_REASON_IPV6_MC_SCOPE1, SAI_IN_DROP_REASON_IRIF_DISABLED, SAI_IN_DROP_REASON_ERIF_DISABLED, SAI_IN_DROP_REASON_LPM4_MISS, SAI_IN_DROP_REASON_LPM6_MISS, SAI_IN_DROP_REASON_BLACKHOLE_ROUTE, SAI_IN_DROP_REASON_BLACKHOLE_ARP, SAI_IN_DROP_REASON_UNRESOLVED_NEXT_HOP, SAI_IN_DROP_REASON_L3_EGRESS_LINK_DOWN, SAI_IN_DROP_REASON_DECAP_ERROR, SAI_IN_DROP_REASON_ACL_ANY, SAI_IN_DROP_REASON_ACL_INGRESS_PORT, SAI_IN_DROP_REASON_ACL_INGRESS_LAG, SAI_IN_DROP_REASON_ACL_INGRESS_VLAN, SAI_IN_DROP_REASON_ACL_INGRESS_RIF, SAI_IN_DROP_REASON_ACL_INGRESS_SWITCH, SAI_IN_DROP_REASON_ACL_EGRESS_PORT, SAI_IN_DROP_REASON_ACL_EGRESS_LAG, SAI_IN_DROP_REASON_ACL_EGRESS_VLAN, SAI_IN_DROP_REASON_ACL_EGRESS_RIF, SAI_IN_DROP_REASON_ACL_EGRESS_SWITCH, SAI_IN_DROP_REASON_FDB_AND_BLACKHOLE_DISCARDS, SAI_IN_DROP_REASON_NON_ROUTABLE, SAI_IN_DROP_REASON_MPLS_MISS, SAI_IN_DROP_REASON_SRV6_LOCAL_SID_DROP] supp = self.client.sai_thrift_query_attribute_enum_values_capability( SAI_OBJECT_TYPE_DEBUG_COUNTER, SAI_DEBUG_COUNTER_ATTR_IN_DROP_REASON_LIST, len(in_caps_list)) print("\nSupported IN drop reasons") for drop_reason in in_caps_list: for in_drop_reason in supp: if drop_reason == in_drop_reason: print(map_drop_reason_to_string([drop_reason])) print("\tok") # supported SAI OUT drop reason list out_caps_list = [ SAI_OUT_DROP_REASON_L2_ANY, SAI_OUT_DROP_REASON_EGRESS_VLAN_FILTER, SAI_OUT_DROP_REASON_L3_ANY, SAI_OUT_DROP_REASON_L3_EGRESS_LINK_DOWN, SAI_OUT_DROP_REASON_TUNNEL_LOOPBACK_PACKET_DROP] supp = self.client.sai_thrift_query_attribute_enum_values_capability( SAI_OBJECT_TYPE_DEBUG_COUNTER, SAI_DEBUG_COUNTER_ATTR_OUT_DROP_REASON_LIST, len(out_caps_list)) print("\nSupported OUT drop reasons") for drop_reason in out_caps_list: for out_drop_reason in supp: if drop_reason == out_drop_reason: print(map_drop_reason_to_string([drop_reason])) print("\tok") @group("draft") class GetDebugCounterAvailability(BaseDebugCounterClass): ''' SAI query DebugCounter availability. ''' def runTest(self): # Supported debug counter types dc_types = [SAI_DEBUG_COUNTER_TYPE_PORT_IN_DROP_REASONS, SAI_DEBUG_COUNTER_TYPE_SWITCH_IN_DROP_REASONS, SAI_DEBUG_COUNTER_TYPE_PORT_OUT_DROP_REASONS, SAI_DEBUG_COUNTER_TYPE_SWITCH_OUT_DROP_REASONS] # Verify supported debug counter types for dc_type in dc_types: dc_avail = sai_thrift_object_type_get_availability( self.client, SAI_OBJECT_TYPE_DEBUG_COUNTER, SAI_DEBUG_COUNTER_ATTR_TYPE, dc_type) print("verify DC Type = %s, dc_availability=%d" % (dc_type, dc_avail)) self.assertEqual(dc_avail, 0x400) print("\tok") # Verify unknown(random) debug counter types, should return value of 0 for dc_type in [123, 777, 0x400]: dc_avail = sai_thrift_object_type_get_availability( self.client, SAI_OBJECT_TYPE_DEBUG_COUNTER, SAI_DEBUG_COUNTER_ATTR_TYPE, dc_type) print("verify unsupported DC Type = %s, dc_availability=%d" % (dc_type, dc_avail)) self.assertEqual(dc_avail, 0) print("\tok")