ptf/saihostif.py (3,414 lines of code) (raw):

# Copyright 2021-present Intel Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Thrift SAI interface host interface tests """ from sai_thrift.sai_headers import * from sai_base_test import * @group("draft") class hostIfGenlTest(PlatformSaiHelper): ''' This verifies the correctness of host interface creation of type generic netlink with object type Port. The test verifies the confguration. ''' def setUp(self): super(hostIfGenlTest, self).setUp() def runTest(self): print("\nhostIfGenlTest()") hostif1_port = self.port24 try: hostif1 = sai_thrift_create_hostif(self.client, obj_id=hostif1_port, name="genl_packet", genetlink_mcgrp_name="packets", type=SAI_HOSTIF_TYPE_GENETLINK) self.assertTrue(hostif1 != 0) print("\tVerification complete") finally: pass def tearDown(self): super(hostIfGenlTest, self).tearDown() @group("draft") class HostifCreationTestHelper(PlatformSaiHelper): ''' Verify host interface creation and packet RX for hostif type = netdev and different host interface object types ''' def setUp(self): super(HostifCreationTestHelper, self).setUp() def tearDown(self): sai_thrift_flush_fdb_entries( self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) super(HostifCreationTestHelper, self).tearDown() @group("draft") class lagNetdevHostifCreationTest(HostifCreationTestHelper): ''' Verify correctness of host interface creation of type netdev with object type LAG. Verify also if management packets are forwarded to LAG host interface after hostif table wildcard entry creation. ''' def setUp(self): super(lagNetdevHostifCreationTest, self).setUp() def runTest(self): print("\nlagNetdevHostifCreationTest()") lag_ports = [self.port24, self.port25] lag_dev_ports = [self.dev_port24, self.dev_port25] lag_hostif_name = "lag_hostif" lacp_mac = "01:80:c2:00:00:02" lacp_pkt = simple_eth_packet(eth_dst=lacp_mac, pktlen=100, eth_type=0x8809)/(chr(0x01) + (chr(0x01))) try: lag10 = sai_thrift_create_lag(self.client) lag_hostif = sai_thrift_create_hostif(self.client, name=lag_hostif_name, obj_id=lag10, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(lag_hostif, 0) lag_hif_socket = open_packet_socket(lag_hostif_name) lacp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LACP) self.assertNotEqual(lacp_trap, 0) channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_LOGICAL_PORT hif_tbl_entry = sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_WILDCARD) self.assertNotEqual(hif_tbl_entry, 0) # create LAG members lag10_member1 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[0]) lag10_member2 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[1]) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) for dev_port in lag_dev_ports: print("Sending LACP packet on LAG port %d" % dev_port) send_packet(self, dev_port, lacp_pkt) print("Verifying LACP packet on LAG host interface") self.assertTrue(socket_verify_packet(lacp_pkt, lag_hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + len(lag_dev_ports)) print("Removing LAG member and verifying LACP packet " "is no longer received on LAG host interface") lag10_member1 = sai_thrift_remove_lag_member(self.client, lag10_member1) self.assertEqual(lag10_member1, 0) print("Sending LACP packet on port %d (removed from LAG)" % lag_dev_ports[0]) send_packet(self, lag_dev_ports[0], lacp_pkt) print("Verifying CPU port queue stats") pre_stats = post_stats time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 1) print("Verifying no LACP packet on LAG host interface") self.assertFalse(socket_verify_packet(lacp_pkt, lag_hif_socket)) print("\tOK") print("\tVerification complete") finally: self.dataplane.flush() if lag10_member1: sai_thrift_remove_lag_member(self.client, lag10_member1) sai_thrift_remove_lag_member(self.client, lag10_member2) sai_thrift_remove_hostif_table_entry(self.client, hif_tbl_entry) sai_thrift_remove_hostif_trap(self.client, lacp_trap) sai_thrift_remove_hostif(self.client, lag_hostif) sai_thrift_remove_lag(self.client, lag10) def tearDown(self): super(lagNetdevHostifCreationTest, self).tearDown() @group("draft") class vlanSviNetdevHostifCreationTest(HostifCreationTestHelper): ''' Verify correctness of host interfaces creation of type netdev with object type Port and LAG for VLAN members. Verify also if management packets are forwarded to correct host interfaces after hostif table wildcard entry creation. ''' def setUp(self): super(vlanSviNetdevHostifCreationTest, self).setUp() def runTest(self): print("\nvlanSviNetdevHostifCreationTest()") vid = 100 vlan_ports = [self.port24, self.port25] vlan_dev_ports = [self.dev_port24, self.dev_port25] vlan_hostif_name = "vlan_hostif" hostif1_name = "hostif1" hostif2_name = "hostif2" lag_ports = [self.port26, self.port27] lag_dev_ports = [self.dev_port26, self.dev_port27] lag_hostif_name = "lag_hostif" arp_pkt = simple_arp_packet(arp_op=1, pktlen=100) tag_arp_pkt = simple_arp_packet(arp_op=1, vlan_vid=vid, pktlen=104) try: lag10 = sai_thrift_create_lag(self.client) vlan100 = sai_thrift_create_vlan(self.client, vlan_id=vid) vlan100_rif = sai_thrift_create_router_interface( self.client, type=SAI_ROUTER_INTERFACE_TYPE_VLAN, virtual_router_id=self.default_vrf, vlan_id=vlan100) self.assertTrue(vlan100_rif != 0) vlan_hostif = sai_thrift_create_hostif(self.client, name=vlan_hostif_name, obj_id=vlan100, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(vlan_hostif, 0) arp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_ARP_REQUEST) self.assertNotEqual(arp_trap, 0) # create LAG members lag10_member1 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[0]) lag10_member2 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[1]) lag_hostif = sai_thrift_create_hostif(self.client, name=lag_hostif_name, obj_id=lag10, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(lag_hostif, 0) lag_hif_socket = open_packet_socket(lag_hostif_name) # create VLAN members iport_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=vlan_ports[0], type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.assertNotEqual(iport_bp, 0) eport_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=vlan_ports[1], type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.assertNotEqual(eport_bp, 0) lag10_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=lag10, type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.assertNotEqual(lag10_bp, 0) vlan100_member0 = sai_thrift_create_vlan_member( self.client, vlan_id=vlan100, bridge_port_id=iport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) vlan100_member1 = sai_thrift_create_vlan_member( self.client, vlan_id=vlan100, bridge_port_id=eport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_TAGGED) vlan100_member2 = sai_thrift_create_vlan_member( self.client, vlan_id=vlan100, bridge_port_id=lag10_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) sai_thrift_set_port_attribute( self.client, vlan_ports[0], port_vlan_id=vid) sai_thrift_set_lag_attribute(self.client, lag10, port_vlan_id=vid) hostif1 = sai_thrift_create_hostif(self.client, name=hostif1_name, obj_id=vlan_ports[0], type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif1, 0) hif1_socket = open_packet_socket(hostif1_name) hostif2 = sai_thrift_create_hostif(self.client, name=hostif2_name, obj_id=vlan_ports[1], type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif2, 0) hif2_socket = open_packet_socket(hostif2_name) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Sending ARP packet on port %d (untagged VLAN member)" % vlan_dev_ports[0]) send_packet(self, vlan_dev_ports[0], arp_pkt) print("Verifying ARP packet on VLAN untagged port host interface") self.assertTrue(socket_verify_packet(arp_pkt, hif1_socket)) print("\tOK") print("Sending ARP packet on port %d (tagged VLAN member)" % vlan_dev_ports[1]) send_packet(self, vlan_dev_ports[1], tag_arp_pkt) print("Verifying ARP packet on VLAN tagged port host interface") # VLAN tag should be removed on port host interface self.assertTrue(socket_verify_packet(arp_pkt, hif2_socket)) print("\tOK") for dev_port in lag_dev_ports: print("Sending ARP packet on port %d (in LAG)" % dev_port) send_packet(self, dev_port, arp_pkt) print("Verifying ARP packet on LAG host interface") self.assertTrue(socket_verify_packet(arp_pkt, lag_hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 4) print("\tVerification complete") finally: self.dataplane.flush() sai_thrift_flush_fdb_entries( self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) sai_thrift_remove_hostif(self.client, hostif1) sai_thrift_remove_hostif(self.client, hostif2) sai_thrift_remove_vlan_member(self.client, vlan100_member0) sai_thrift_remove_vlan_member(self.client, vlan100_member1) sai_thrift_remove_vlan_member(self.client, vlan100_member2) sai_thrift_set_port_attribute( self.client, vlan_ports[0], port_vlan_id=0) sai_thrift_set_lag_attribute( self.client, lag10, port_vlan_id=0) sai_thrift_remove_bridge_port(self.client, iport_bp) sai_thrift_remove_bridge_port(self.client, eport_bp) sai_thrift_remove_bridge_port(self.client, lag10_bp) sai_thrift_remove_hostif(self.client, lag_hostif) sai_thrift_remove_lag_member(self.client, lag10_member1) sai_thrift_remove_lag_member(self.client, lag10_member2) sai_thrift_remove_hostif_trap(self.client, arp_trap) sai_thrift_remove_hostif(self.client, vlan_hostif) sai_thrift_remove_router_interface(self.client, vlan100_rif) sai_thrift_remove_vlan(self.client, vlan100) sai_thrift_remove_lag(self.client, lag10) def tearDown(self): super(vlanSviNetdevHostifCreationTest, self).tearDown() @group("draft") class portNetdevHostifCreationTest(PlatformSaiHelper): """ Verify correctness of host interface creation of type netdev with object type Port. Verify also if management packets are forwarded to ports host interfaces after hostif table wildcard entry creation. """ def setUp(self): super(portNetdevHostifCreationTest, self).setUp() def runTest(self): print("\nportNetdevHostifCreationTest()") hostif1_port = self.port0 hostif1_dev_port = self.dev_port0 hostif1_name = "hostif1" hostif2_port = self.port1 hostif2_dev_port = self.dev_port1 hostif2_name = "hostif2" lldp_mac = "01:80:c2:00:00:0e" lldp_pkt = simple_eth_packet(eth_dst=lldp_mac, pktlen=100, eth_type=0x88cc) try: hostif1 = sai_thrift_create_hostif(self.client, name=hostif1_name, obj_id=hostif1_port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif1, 0) hostif2 = sai_thrift_create_hostif(self.client, name=hostif2_name, obj_id=hostif2_port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif2, 0) hif1_socket = open_packet_socket(hostif1_name) hif2_socket = open_packet_socket(hostif2_name) lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(lldp_trap, 0) channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_PHYSICAL_PORT hif_tbl_entry = sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_WILDCARD) self.assertNotEqual(hif_tbl_entry, 0) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Sending LLDP packet on port %d" % hostif1_dev_port) send_packet(self, hostif1_dev_port, lldp_pkt) print("Verifying LLDP packet on port host interface") self.assertTrue(socket_verify_packet(lldp_pkt, hif1_socket)) print("\tOK") print("Sending LLDP packet on port %d" % hostif2_dev_port) send_packet(self, hostif2_dev_port, lldp_pkt) print("Verifying LLDP packet on port host interface") self.assertTrue(socket_verify_packet(lldp_pkt, hif2_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 2) print("\tOK") print("\tVerification complete") finally: self.dataplane.flush() sai_thrift_remove_hostif_table_entry(self.client, hif_tbl_entry) sai_thrift_remove_hostif_trap(self.client, lldp_trap) sai_thrift_remove_hostif(self.client, hostif1) sai_thrift_remove_hostif(self.client, hostif2) def tearDown(self): super(portNetdevHostifCreationTest, self).tearDown() @group("draft") class HostifTaggingTestHelper(PlatformSaiHelper): ''' Verify VLAN tags handling ''' def setUp(self): super(HostifTaggingTestHelper, self).setUp() def tearDown(self): sai_thrift_flush_fdb_entries( self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) super(HostifTaggingTestHelper, self).tearDown() @group("draft") class hostifStripTagTest(HostifTaggingTestHelper): ''' Verify VLAN tag stripping on host interface ''' def setUp(self): super(hostifStripTagTest, self).setUp() def runTest(self): print("\nhostifStripTagTest()") vid = 10 hostif_port = self.port1 hostif_dev_port = self.dev_port1 hostif_name = "hostif" lldp_mac = "01:80:c2:00:00:0e" tag_pkt = simple_eth_dot1q_packet(eth_dst=lldp_mac, dl_vlan_enable=True, vlan_vid=vid, pktlen=104, eth_type=0x88cc) exp_pkt = simple_eth_dot1q_packet(eth_dst=lldp_mac, dl_vlan_enable=False, pktlen=100, eth_type=0x88cc) try: hostif = sai_thrift_create_hostif( self.client, name=hostif_name, obj_id=hostif_port, type=SAI_HOSTIF_TYPE_NETDEV, vlan_tag=SAI_HOSTIF_VLAN_TAG_STRIP) self.assertNotEqual(hostif, 0) hif_attr = sai_thrift_get_hostif_attribute(self.client, hostif, vlan_tag=True) self.assertEqual(hif_attr['vlan_tag'], SAI_HOSTIF_VLAN_TAG_STRIP) hif_socket = open_packet_socket(hostif_name) lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(lldp_trap, 0) print("Sending packet on port %d" % hostif_dev_port) send_packet(self, hostif_dev_port, tag_pkt) print("Verifying if packet on port host interface is untagged") self.assertTrue(socket_verify_packet(exp_pkt, hif_socket)) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, self.lldp_trap) sai_thrift_remove_hostif(self.client, hostif) def tearDown(self): super(hostifStripTagTest, self).tearDown() @group("draft") class hostifKeepTagTest(HostifTaggingTestHelper): ''' Verify VLAN tag keeping on host interface For tagged packets mode tag should be left unchanged, for untagged it should be added. ''' def setUp(self): super(hostifKeepTagTest, self).setUp() def runTest(self): print("\nhostifKeepTagTest()") vid = 10 hostif_port = self.port1 hostif_dev_port = self.dev_port1 hostif_name = "hostif" lldp_mac = "01:80:c2:00:00:0e" tag_pkt = simple_eth_dot1q_packet(eth_dst=lldp_mac, dl_vlan_enable=True, vlan_vid=vid, pktlen=104, eth_type=0x88cc) pkt = simple_eth_dot1q_packet(eth_dst=lldp_mac, dl_vlan_enable=False, pktlen=100, eth_type=0x88cc) exp_pkt = tag_pkt try: hostif = sai_thrift_create_hostif( self.client, name=hostif_name, obj_id=hostif_port, type=SAI_HOSTIF_TYPE_NETDEV, vlan_tag=SAI_HOSTIF_VLAN_TAG_KEEP) self.assertNotEqual(hostif, 0) hif_attr = sai_thrift_get_hostif_attribute(self.client, hostif, vlan_tag=True) self.assertEqual(hif_attr['vlan_tag'], SAI_HOSTIF_VLAN_TAG_KEEP) hif_socket = open_packet_socket(hostif_name) lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(lldp_trap, 0) print("Sending tagged packet on port %d" % hostif_dev_port) send_packet(self, hostif_dev_port, tag_pkt) print("Verifying if packet on port host interface is tagged") self.assertTrue(socket_verify_packet(exp_pkt, hif_socket)) print("\tOK") print("Sending untagged packet on port %d" % hostif_dev_port) send_packet(self, hostif_dev_port, pkt) print("Verifying if packet on port host interface is tagged") self.assertTrue(socket_verify_packet(exp_pkt, hif_socket)) print("\tOK") finally: sai_thrift_remove_hostif(self.client, hostif) sai_thrift_remove_hostif_trap(self.client, self.lldp_trap) def tearDown(self): super(hostifKeepTagTest, self).tearDown() @group("draft") class hostifOriginalTagTest(HostifTaggingTestHelper): ''' Verify keeping original VLAN tagging on host interface ''' def setUp(self): super(hostifOriginalTagTest, self).setUp() def runTest(self): print("\nhostifOriginalTagTest()") vid = 10 hostif_port = self.port1 hostif_dev_port = self.dev_port1 hostif_name = "hostif" lldp_mac = "01:80:c2:00:00:0e" tag_pkt = simple_eth_dot1q_packet(eth_dst=lldp_mac, dl_vlan_enable=True, vlan_vid=vid, pktlen=104, eth_type=0x88cc) pkt = simple_eth_dot1q_packet(eth_dst=lldp_mac, dl_vlan_enable=False, pktlen=100, eth_type=0x88cc) exp_pkt1 = tag_pkt exp_pkt2 = pkt try: hostif = sai_thrift_create_hostif( self.client, name=hostif_name, obj_id=hostif_port, type=SAI_HOSTIF_TYPE_NETDEV, vlan_tag=SAI_HOSTIF_VLAN_TAG_ORIGINAL) self.assertNotEqual(hostif, 0) hif_attr = sai_thrift_get_hostif_attribute( self.client, hostif, vlan_tag=True) self.assertEqual(hif_attr['vlan_tag'], SAI_HOSTIF_VLAN_TAG_ORIGINAL) hif_socket = open_packet_socket(hostif_name) lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(lldp_trap, 0) print("Sending tagged packet on port %d" % hostif_dev_port) send_packet(self, hostif_dev_port, tag_pkt) print("Verifying if packet on port host interface is tagged") self.assertTrue(socket_verify_packet(exp_pkt1, hif_socket)) print("\tOK") print("Sending untagged packet on port %d" % hostif_dev_port) send_packet(self, hostif_dev_port, pkt) print("Verifying if packet on port host interface is tagged") self.assertTrue(socket_verify_packet(exp_pkt2, hif_socket)) print("\tOK") finally: sai_thrift_remove_hostif(self.client, hostif) sai_thrift_remove_hostif_trap(self.client, self.lldp_trap) def tearDown(self): super(hostifOriginalTagTest, self).tearDown() @group("draft") class HostifTrapActionTestHelper(PlatformSaiHelper): ''' Verify packets handling for different kinds of trap actions ''' def setUp(self): super(HostifTrapActionTestHelper, self).setUp() vid = 100 self.iport = self.port24 self.iport_dev = self.dev_port24 self.eport = self.port25 self.eport_dev = self.dev_port25 # create VLAN with 2 members self.vlan100 = sai_thrift_create_vlan(self.client, vlan_id=vid) self.vlan100_rif = sai_thrift_create_router_interface( self.client, type=SAI_ROUTER_INTERFACE_TYPE_VLAN, virtual_router_id=self.default_vrf, vlan_id=self.vlan100) self.iport_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=self.iport, type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.eport_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=self.eport, type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.vlan100_member0 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.iport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) self.vlan100_member1 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.eport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) sai_thrift_set_port_attribute( self.client, self.iport, port_vlan_id=vid) sai_thrift_set_port_attribute( self.client, self.eport, port_vlan_id=vid) lldp_mac = "01:80:c2:00:00:0e" self.lldp_pkt = simple_eth_packet(eth_dst=lldp_mac, pktlen=100, eth_type=0x88cc) self.arp_pkt = simple_arp_packet(arp_op=1, pktlen=100) self.arp_tagged_pkt = simple_arp_packet(arp_op=1, vlan_vid=vid, pktlen=104) self.cpu_queue_state = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0)["SAI_QUEUE_STAT_PACKETS"] def tearDown(self): sai_thrift_flush_fdb_entries( self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) sai_thrift_remove_vlan_member(self.client, self.vlan100_member0) sai_thrift_remove_vlan_member(self.client, self.vlan100_member1) sai_thrift_set_port_attribute(self.client, self.iport, port_vlan_id=0) sai_thrift_set_port_attribute(self.client, self.eport, port_vlan_id=0) sai_thrift_remove_bridge_port(self.client, self.iport_bp) sai_thrift_remove_bridge_port(self.client, self.eport_bp) sai_thrift_remove_router_interface(self.client, self.vlan100_rif) sai_thrift_remove_vlan(self.client, self.vlan100) super(HostifTrapActionTestHelper, self).tearDown() @group("draft") class dropTrapActionTest(HostifTrapActionTestHelper): ''' Verify drop trap action ''' def setUp(self): super(dropTrapActionTest, self).setUp() def runTest(self): print("\ndropTrapActionTest()") try: drop_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_DROP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(drop_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is dropped") verify_no_other_packets(self) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, drop_trap) def tearDown(self): super(dropTrapActionTest, self).tearDown() @group("draft") class forwardTrapActionTest(HostifTrapActionTestHelper): ''' Verify forward trap action ''' def setUp(self): super(forwardTrapActionTest, self).setUp() def runTest(self): print("\nforwardTrapActionTest()") try: forward_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_FORWARD, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(forward_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is forwarded to port %d" % self.eport_dev) verify_packet(self, self.lldp_pkt, self.eport_dev) verify_no_other_packets(self) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, forward_trap) def tearDown(self): super(forwardTrapActionTest, self).tearDown() @group("draft") class copyTrapActionTest(HostifTrapActionTestHelper): ''' Verify copy trap action [Copy Packet to CPU without interfering the original packet action in the pipeline] ''' def setUp(self): super(copyTrapActionTest, self).setUp() def runTest(self): print("\ncopyTrapActionTest()") try: copy_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(copy_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is forwarded to port %d and " "copied to CPU" % self.eport_dev) verify_packet(self, self.lldp_pkt, self.eport_dev) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, copy_trap) def tearDown(self): super(copyTrapActionTest, self).tearDown() @group("draft") class copyTrapActionTaggedTest(HostifTrapActionTestHelper): ''' This verifies copy trap action using tagged ARP packet [Copy Packet to CPU without interfering the original packet action in the pipeline] ''' def setUp(self): super(copyTrapActionTaggedTest, self).setUp() def runTest(self): print("\ncopyTrapActionTaggedTest()") try: copy_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_ARP_REQUEST) self.assertTrue(copy_trap != 0) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Sending ARP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.arp_pkt) print("Verifying ARP packet is forwarded to port %d and " "copied to CPU" % self.eport_dev) verify_packet(self, self.arp_pkt, self.eport_dev) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] arp_len = int(post_stats["SAI_QUEUE_STAT_BYTES"] - pre_stats["SAI_QUEUE_STAT_BYTES"]) pre_stats = post_stats print("Sending tagged ARP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.arp_tagged_pkt) print("Verifying tagged ARP packet is forwarded to port %d and " "copied to CPU" % self.eport_dev) verify_packet(self, self.arp_pkt, self.eport_dev) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) arp_tagged_len = int(post_stats["SAI_QUEUE_STAT_BYTES"] - pre_stats["SAI_QUEUE_STAT_BYTES"]) self.assertEqual( arp_tagged_len, arp_len + 4) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, copy_trap) def tearDown(self): super(copyTrapActionTaggedTest, self).tearDown() @group("draft") class trapTrapActionTest(HostifTrapActionTestHelper): ''' Verify trap trap action [This is a combination of SAI packet action COPY and DROP: A copy of the original packet is sent to CPU port, the original packet is forcefully dropped from the pipeline] ''' def setUp(self): super(trapTrapActionTest, self).setUp() def runTest(self): print("\ntrapTrapActionTest()") try: trap_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(trap_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is trapped by " "checkig CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, trap_trap) def tearDown(self): super(trapTrapActionTest, self).tearDown() @group("draft") class logTrapActionTest(HostifTrapActionTestHelper): ''' Verify log trap action. [A copy of the original packet is sent to CPU port, the original packet, if it was to be dropped in the original pipeline, change the pipeline action to forward (cancel drop)] ''' def setUp(self): super(logTrapActionTest, self).setUp() def runTest(self): print("\nlogTrapActionTest()") try: log_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_LOG, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(log_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is forwarded to port %d and " "copied to CPU" % self.eport_dev) verify_packet(self, self.lldp_pkt, self.eport_dev) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, log_trap) def tearDown(self): super(logTrapActionTest, self).tearDown() @group("draft") class denyTrapActionTest(HostifTrapActionTestHelper): ''' Verify deny trap action [This is a combination of SAI packet action COPY_CANCEL and DROP] ''' def setUp(self): super(denyTrapActionTest, self).setUp() def runTest(self): print("\ndenyTrapActionTest()") try: deny_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_DENY, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(deny_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is denied") verify_no_other_packets(self) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, deny_trap) def tearDown(self): super(denyTrapActionTest, self).tearDown() @group("draft") class transitTrapActionTest(HostifTrapActionTestHelper): ''' Verify transit trap action [This is a combination of SAI packet action COPY_CANCEL and FORWARD] ''' def setUp(self): super(transitTrapActionTest, self).setUp() def runTest(self): print("\ntransitTrapActionTest()") try: transit_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_FORWARD, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(transit_trap, 0) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is forwarded to port %d" % self.eport_dev) verify_packet(self, self.lldp_pkt, self.eport_dev) verify_no_other_packets(self) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, transit_trap) def tearDown(self): super(transitTrapActionTest, self).tearDown() @group("draft") class hostifPriorityTest(HostifTrapActionTestHelper): ''' Verify if packets are handled using hostif trap with higher prioriy set ''' def setUp(self): super(hostifPriorityTest, self).setUp() def runTest(self): print("\nhostifPriorityTest()") try: print("Create trap with drop action") drop_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_DROP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP, trap_priority=1) self.assertNotEqual(drop_trap, 0) print("Create higher priority trap with forward action") forward_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_FORWARD, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP, trap_priority=5) self.assertNotEqual(forward_trap, 0) self.dataplane.flush() print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is forwarded to port %d" % self.eport_dev) verify_packet(self, self.lldp_pkt, self.eport_dev) verify_no_other_packets(self) print("\tOK") print("Changing drop trap priority") sai_thrift_set_hostif_trap_attribute( self.client, drop_trap, trap_priority=10) print("Sending LLDP packet on port %d" % self.iport_dev) send_packet(self, self.iport_dev, self.lldp_pkt) print("Verifying LLDP packet is dropped") verify_no_other_packets(self) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, forward_trap) sai_thrift_remove_hostif_trap(self.client, drop_trap) def tearDown(self): super(hostifPriorityTest, self).tearDown() @group("draft") class lagHostifTxTest(PlatformSaiHelper): ''' Verify LAG host interface tx ''' def setUp(self): super(lagHostifTxTest, self).setUp() def runTest(self): print("\nlagHostifTxTest()") lag_ports = [self.port24, self.port25] lag_dev_ports = [self.dev_port24, self.dev_port25] lag_hostif_name = "lag_hostif" test_ip = "10.10.10.1" pkt = simple_udp_packet(ip_dst=test_ip) try: lag10 = sai_thrift_create_lag(self.client) lag10_member1 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[0]) lag10_member2 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[1]) lag_hostif = sai_thrift_create_hostif(self.client, name=lag_hostif_name, obj_id=lag10, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(lag_hostif, 0) lag_hif_socket = open_packet_socket(lag_hostif_name) print("Sending packet via LAG hostif") lag_hif_socket.send(bytes(pkt)) print("\tOK") print("Verifying packet on LAG port") verify_packet_any_port(self, pkt, lag_dev_ports) print("\tOK") print("Removing one lag member") lag10_member1 = sai_thrift_remove_lag_member( self.client, lag10_member1) print("Sending packet via LAG hostif") lag_hif_socket.send(bytes(pkt)) print("\tOK") print("Verifying packet on port") verify_packet(self, pkt, lag_dev_ports[1]) print("\tOK") print("Removing last lag member") lag10_member2 = sai_thrift_remove_lag_member( self.client, lag10_member2) print("Sending packet via LAG hostif") lag_hif_socket.send(bytes(pkt)) print("\tOK") print("Verifying no packets on LAG ports") verify_no_packet_any(self, pkt, lag_dev_ports) print("\tOK") finally: sai_thrift_remove_hostif(self.client, lag_hostif) if lag10_member1: sai_thrift_remove_lag_member(self.client, lag10_member1) if lag10_member2: sai_thrift_remove_lag_member(self.client, lag10_member2) sai_thrift_remove_lag(self.client, lag10) def tearDown(self): super(lagHostifTxTest, self).tearDown() @group("draft") class arpRxTxTest(PlatformSaiHelper): """ Verify host interface rx/tx path with ARP packet """ def setUp(self): super(arpRxTxTest, self).setUp() def runTest(self): print("\narpRxTxTest()") test_port = self.port0 test_dev_port = self.dev_port0 hostif_name = "rif_hostif" rif_mac = "00:11:22:33:44:55" src_mac = "00:06:07:08:09:0a" rif_ip = "10.10.0.10" src_ip = "10.10.0.1" arp_req_pkt = simple_arp_packet(arp_op=1, pktlen=100, eth_src=src_mac, hw_snd=src_mac, ip_snd=src_ip, ip_tgt=rif_ip) arp_resp_pkt = simple_arp_packet(arp_op=2, pktlen=42, eth_src=rif_mac, eth_dst=src_mac, hw_snd=rif_mac, hw_tgt=src_mac, ip_snd=rif_ip, ip_tgt=src_ip) try: rif = sai_thrift_create_router_interface( self.client, type=SAI_ROUTER_INTERFACE_TYPE_PORT, virtual_router_id=self.default_vrf, port_id=test_port, src_mac_address=rif_mac) self.assertNotEqual(rif, 0) arp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_ARP_REQUEST) self.assertNotEqual(arp_trap, 0) hostif = sai_thrift_create_hostif(self.client, name=hostif_name, obj_id=rif, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif, 0) hif_socket = open_packet_socket(hostif_name) # set host interface IP address os.system("ifconfig %s %s/24" % (hostif_name, rif_ip)) os.system("ifconfig %s hw ether %s" % (hostif_name, rif_mac)) os.system("sudo ifconfig %s up" % hostif_name) hostif_ip = os.popen("ifconfig %s | grep 'inet addr' | " "cut -d: -f2 | awk '{print $1}'" % hostif_name).read() self.assertEqual(hostif_ip.rstrip(), rif_ip) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Sending ARP request on port %d" % test_dev_port) send_packet(self, test_dev_port, arp_req_pkt) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 1) self.assertTrue(socket_verify_packet(arp_req_pkt, hif_socket)) print("\tOK") print("Verifying ARP response") verify_packet(self, arp_resp_pkt, test_dev_port) print("\tOK") finally: self.dataplane.flush() sai_thrift_remove_hostif(self.client, hostif) sai_thrift_remove_hostif_trap(self.client, arp_trap) sai_thrift_remove_router_interface(self.client, rif) def tearDown(self): super(arpRxTxTest, self).tearDown() @group("draft") class portHostifTxTest(PlatformSaiHelper): """ Verify port host interface tx """ def setUp(self): super(portHostifTxTest, self).setUp() def runTest(self): print("\nportHostifTxTest()") test_port = self.port0 test_dev_port = self.dev_port0 hostif_name = "hostif" test_ip = "10.10.10.1" pkt = simple_udp_packet(ip_dst=test_ip) try: hostif = sai_thrift_create_hostif(self.client, name=hostif_name, obj_id=test_port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif, 0) hif_socket = open_packet_socket(hostif_name) print("Sending packet via port hostif") hif_socket.send(bytes(pkt)) print("\tOK") print("Verifying packet on port") verify_packet(self, pkt, test_dev_port) print("\tOK") finally: sai_thrift_remove_hostif(self.client, hostif) def tearDown(self): super(portHostifTxTest, self).tearDown() @group("draft") class HostifTableMatchTestHelper(PlatformSaiHelper): ''' Verify hostif interface table match for entry type Wildcard and channel type = CB ''' def setUp(self): super(HostifTableMatchTestHelper, self).setUp() self.vid = 100 self.iport = self.port24 self.iport_dev = self.dev_port24 self.eport = self.port25 self.eport_dev = self.dev_port25 # create VLAN with 2 members self.vlan100 = sai_thrift_create_vlan(self.client, vlan_id=self.vid) self.iport_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=self.iport, type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.eport_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=self.eport, type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.vlan100_member0 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.iport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) self.vlan100_member1 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.eport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) sai_thrift_set_port_attribute( self.client, self.iport, port_vlan_id=self.vid) sai_thrift_set_port_attribute( self.client, self.eport, port_vlan_id=self.vid) self.cpu_queue_state = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0)["SAI_QUEUE_STAT_PACKETS"] def tearDown(self): sai_thrift_flush_fdb_entries( self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) sai_thrift_remove_vlan_member(self.client, self.vlan100_member0) sai_thrift_remove_vlan_member(self.client, self.vlan100_member1) sai_thrift_set_port_attribute(self.client, self.iport, port_vlan_id=0) sai_thrift_set_port_attribute(self.client, self.eport, port_vlan_id=0) sai_thrift_remove_bridge_port(self.client, self.iport_bp) sai_thrift_remove_bridge_port(self.client, self.eport_bp) sai_thrift_remove_vlan(self.client, self.vlan100) super(HostifTableMatchTestHelper, self).tearDown() @group("draft") class wildcardEntryCbChannelLldp(HostifTableMatchTestHelper): ''' Verify hostif interface table match for entry type Wildcard and channel type = CB with LLDP packet ''' def setUp(self): super(wildcardEntryCbChannelLldp, self).setUp() def runTest(self): print("\nwildcardEntryCbChannelLldp()") hostif_port = self.iport hostif_dev_port = self.iport_dev hostif_name = "hostif" lldp_mac = "01:80:c2:00:00:0e" lldp_pkt = simple_eth_packet(eth_dst=lldp_mac, pktlen=100, eth_type=0x88cc) try: hostif = sai_thrift_create_hostif(self.client, name=hostif_name, obj_id=hostif_port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(hostif, 0) hif_socket = open_packet_socket(hostif_name) lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(lldp_trap, 0) hif_tbl_entry = sai_thrift_create_hostif_table_entry( self.client, channel_type=SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_CB, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_WILDCARD) self.assertNotEqual(hif_tbl_entry, 0) print("Sending LLDP packet on port %d" % hostif_dev_port) send_packet(self, hostif_dev_port, lldp_pkt) print("Verifying LLDP packet on VLAN port") verify_packet(self, lldp_pkt, self.eport_dev) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] print("Verifying LLDP packet on port host interface") self.assertTrue(socket_verify_packet(lldp_pkt, hif_socket)) print("\tOK") finally: sai_thrift_remove_hostif_table_entry(self.client, hif_tbl_entry) sai_thrift_remove_hostif_trap(self.client, lldp_trap) sai_thrift_remove_hostif(self.client, hostif) def tearDown(self): super(wildcardEntryCbChannelLldp, self).tearDown() @group("draft") class wildcardEntryCbChannelLacp(HostifTableMatchTestHelper): ''' Verify hostif interface table match for entry type Wildcard and channel type = CB with LACP packet ''' def setUp(self): super(wildcardEntryCbChannelLacp, self).setUp() def runTest(self): print("\nwildcardEntryCbChannelLacp()") lag_ports = [self.iport, self.eport] lag_dev_ports = [self.iport_dev, self.eport_dev] lag_hostif_name = "lag_hostif" lacp_mac = "01:80:c2:00:00:02" lacp_pkt = simple_eth_packet(eth_dst=lacp_mac, pktlen=100, eth_type=0x8809)/(chr(0x01) + (chr(0x01))) try: lag10 = sai_thrift_create_lag(self.client) lag_hostif = sai_thrift_create_hostif(self.client, name=lag_hostif_name, obj_id=lag10, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(lag_hostif, 0) lag_hif_socket = open_packet_socket(lag_hostif_name) lacp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_LACP) self.assertNotEqual(lacp_trap, 0) hif_tbl_entry = sai_thrift_create_hostif_table_entry( self.client, channel_type=SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_CB, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_WILDCARD) self.assertNotEqual(hif_tbl_entry, 0) # create LAG members lag10_member1 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[0]) lag10_member2 = sai_thrift_create_lag_member( self.client, lag_id=lag10, port_id=lag_ports[1]) for dev_port in lag_dev_ports: print("Sending LACP packet on port %d (in LAG)" % dev_port) send_packet(self, dev_port, lacp_pkt) print("Verifying LACP packet on LAG host interface") self.assertTrue(socket_verify_packet(lacp_pkt, lag_hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + len(lag_dev_ports)) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_lag_member(self.client, lag10_member1) sai_thrift_remove_lag_member(self.client, lag10_member2) sai_thrift_remove_hostif_table_entry(self.client, hif_tbl_entry) sai_thrift_remove_hostif_trap(self.client, lacp_trap) sai_thrift_remove_hostif(self.client, lag_hostif) sai_thrift_remove_lag(self.client, lag10) def tearDown(self): super(wildcardEntryCbChannelLacp, self).tearDown() @group("draft") class wildcardEntryCbChannelStp(HostifTableMatchTestHelper): ''' Verify hostif interface table match for entry type Wildcard and channel type = CB with STP packet ''' def setUp(self): super(wildcardEntryCbChannelStp, self).setUp() def runTest(self): print("\nwildcardEntryCbChannelStp()") vlan_ports = [self.iport, self.eport] vlan_dev_ports = [self.iport_dev, self.eport_dev] vlan_hostif_name = "vlan_hostif" stp_mac = "01:80:C2:00:00:00" stp_pkt = simple_eth_raw_packet_with_taglist( pktlen=100, eth_dst=stp_mac) stp_pkt[Ether].type = 0x88cc tag_stp_pkt = simple_eth_raw_packet_with_taglist( pktlen=104, eth_dst=stp_mac, dl_taglist_enable=True, dl_vlanid_list=[self.vid]) tag_stp_pkt[Dot1Q].type = 0x88cc try: vlan_hostif = sai_thrift_create_hostif(self.client, name=vlan_hostif_name, obj_id=self.vlan100, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(vlan_hostif, 0) vlan_hif_socket = open_packet_socket(vlan_hostif_name) stp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_STP) self.assertNotEqual(stp_trap, 0) hif_tbl_entry = sai_thrift_create_hostif_table_entry( self.client, channel_type=SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_CB, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_WILDCARD) self.assertNotEqual(hif_tbl_entry, 0) # change tagging mode for one of test ports sai_thrift_remove_vlan_member(self.client, self.vlan100_member1) sai_thrift_set_port_attribute( self.client, vlan_ports[1], port_vlan_id=0) self.vlan100_member1 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.eport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_TAGGED) sai_thrift_set_port_attribute( self.client, vlan_ports[1], port_vlan_id=self.vid) print("Sending STP packet on port %d (untagged VLAN member)" % vlan_dev_ports[0]) send_packet(self, vlan_dev_ports[0], stp_pkt) print("Verifying STP packet on tagged VLAN member") verify_packet(self, tag_stp_pkt, vlan_dev_ports[1]) print("\tOK") print("Verifying STP packet on VLAN host interface") self.assertTrue(socket_verify_packet(stp_pkt, vlan_hif_socket)) print("\tOK") print("Sending STP packet on port %d (tagged VLAN member)" % vlan_dev_ports[1]) send_packet(self, vlan_dev_ports[1], tag_stp_pkt) print("Verifying STP packet on untagged VLAN member") verify_packet(self, stp_pkt, vlan_dev_ports[0]) print("\tOK") print("Verifying STP packet on VLAN host interface") self.assertTrue(socket_verify_packet(stp_pkt, vlan_hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 2) print("\tOK") finally: # revert tagging mode for changed test port sai_thrift_remove_vlan_member(self.client, self.vlan100_member1) self.vlan100_member1 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.eport_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) sai_thrift_set_port_attribute( self.client, vlan_ports[1], port_vlan_id=self.vid) sai_thrift_remove_hostif_table_entry(self.client, hif_tbl_entry) sai_thrift_remove_hostif_trap(self.client, stp_trap) sai_thrift_remove_hostif(self.client, vlan_hostif) def tearDown(self): super(wildcardEntryCbChannelStp, self).tearDown() @group("draft") class HostifTrapTypesTestHelper(PlatformSaiHelper): ''' Verify creation of different types of hostif traps If action is TRAP - only iport is in use, if action is COPY packet is send on iport and should be forwarded to eport ''' def setUp(self): super(HostifTrapTypesTestHelper, self).setUp() vid = 100 self.iport = self.port24 self.iport_dev = self.dev_port24 hostif_name = "hostif" self.hostif = sai_thrift_create_hostif(self.client, name=hostif_name, obj_id=self.port24, type=SAI_HOSTIF_TYPE_NETDEV) self.assertTrue(self.hostif != 0) self.hif_socket = open_packet_socket(hostif_name) hostif_name2 = "hostif2" self.hostif2 = sai_thrift_create_hostif(self.client, name=hostif_name2, obj_id=self.port10, type=SAI_HOSTIF_TYPE_NETDEV) self.assertTrue(self.hostif2 != 0) self.hif_socket2 = open_packet_socket(hostif_name2) # create VLAN with 1 member self.vlan100 = sai_thrift_create_vlan(self.client, vlan_id=vid) self.vlan100_rif = sai_thrift_create_router_interface( self.client, type=SAI_ROUTER_INTERFACE_TYPE_VLAN, virtual_router_id=self.default_vrf, vlan_id=self.vlan100) self.assertTrue(self.vlan100_rif != 0) self.port24_bp = sai_thrift_create_bridge_port( self.client, bridge_id=self.default_1q_bridge, port_id=self.port24, type=SAI_BRIDGE_PORT_TYPE_PORT, admin_state=True) self.vlan100_member0 = sai_thrift_create_vlan_member( self.client, vlan_id=self.vlan100, bridge_port_id=self.port24_bp, vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED) sai_thrift_set_port_attribute(self.client, self.port24, port_vlan_id=vid) self.iport_ip = "10.10.10.1" self.iport_ipv6 = "2001:0db8::1:1" self.iport_mac = "00:11:11:11:11:11" self.eport_ip = "20.20.20.1" self.eport_ipv6 = "2001:0db8::2:2" self.eport_mac = "00:22:22:22:22:22" self.my_ip = "30.30.30.1" self.my_ipv6 = "2001:0db8::3:3" self.iport_nexthop = sai_thrift_create_next_hop( self.client, ip=sai_ipaddress(self.iport_ip), router_interface_id=self.port10_rif, type=SAI_NEXT_HOP_TYPE_IP) self.iport_neighbor_entry = sai_thrift_neighbor_entry_t( rif_id=self.port10_rif, ip_address=sai_ipaddress(self.iport_ip)) self.iport_neighbor = sai_thrift_create_neighbor_entry( self.client, self.iport_neighbor_entry, dst_mac_address=self.iport_mac) self.iport_route = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.iport_ip + '/32')) sai_thrift_create_route_entry(self.client, self.iport_route, next_hop_id=self.iport_nexthop) self.iport_v6_route = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.iport_ipv6 + '/128')) sai_thrift_create_route_entry(self.client, self.iport_v6_route, next_hop_id=self.iport_nexthop) self.eport_nexthop = sai_thrift_create_next_hop( self.client, ip=sai_ipaddress(self.eport_ip), router_interface_id=self.port11_rif, type=SAI_NEXT_HOP_TYPE_IP) self.eport_neighbor_entry = sai_thrift_neighbor_entry_t( rif_id=self.port11_rif, ip_address=sai_ipaddress(self.eport_ip)) self.eport_neighbor = sai_thrift_create_neighbor_entry( self.client, self.eport_neighbor_entry, dst_mac_address=self.eport_mac) self.eport_route = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.eport_ip + '/32')) sai_thrift_create_route_entry(self.client, self.eport_route, next_hop_id=self.eport_nexthop) self.eport_v6_route = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.eport_ipv6 + '/128')) sai_thrift_create_route_entry(self.client, self.eport_v6_route, next_hop_id=self.eport_nexthop) # Ip2me route sw_attr = sai_thrift_get_switch_attribute(self.client, cpu_port=True) cpu_port = sw_attr['cpu_port'] self.ip2me_route = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.my_ip + '/32')) sai_thrift_create_route_entry(self.client, self.ip2me_route, next_hop_id=cpu_port) self.ip2me_v6_route = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(self.my_ipv6 + '/128')) sai_thrift_create_route_entry(self.client, self.ip2me_v6_route, next_hop_id=cpu_port) self.cpu_queue_state = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0)["SAI_QUEUE_STAT_PACKETS"] def tearDown(self): sai_thrift_flush_fdb_entries( self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) sai_thrift_remove_route_entry(self.client, self.ip2me_route) sai_thrift_remove_route_entry(self.client, self.ip2me_v6_route) sai_thrift_remove_route_entry(self.client, self.eport_route) sai_thrift_remove_route_entry(self.client, self.eport_v6_route) sai_thrift_remove_next_hop(self.client, self.eport_nexthop) sai_thrift_remove_neighbor_entry( self.client, self.eport_neighbor_entry) sai_thrift_remove_route_entry(self.client, self.iport_route) sai_thrift_remove_route_entry(self.client, self.iport_v6_route) sai_thrift_remove_next_hop(self.client, self.iport_nexthop) sai_thrift_remove_neighbor_entry(self.client, self.iport_neighbor_entry) sai_thrift_remove_vlan_member(self.client, self.vlan100_member0) sai_thrift_set_port_attribute(self.client, self.port24, port_vlan_id=0) sai_thrift_remove_bridge_port(self.client, self.port24_bp) sai_thrift_remove_router_interface(self.client, self.vlan100_rif) sai_thrift_remove_vlan(self.client, self.vlan100) sai_thrift_remove_hostif(self.client, self.hostif) sai_thrift_remove_hostif(self.client, self.hostif2) super(HostifTrapTypesTestHelper, self).tearDown() @group("draft") class arpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type ARP ''' def setUp(self): super(arpTrapTest, self).setUp() def runTest(self): print("\narpTrapTest()") try: arp_req_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_ARP_REQUEST) self.assertNotEqual(arp_req_trap, 0) arp_resp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_ARP_RESPONSE) self.assertNotEqual(arp_resp_trap, 0) # Broadcast ARP Request arp_pkt = simple_arp_packet(arp_op=1, pktlen=100) print("Sending broadcast ARP request") send_packet(self, self.dev_port24, arp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(arp_pkt, self.hif_socket)) print("\tOK") self.dataplane.flush() # Unicast ARP Request arp_pkt = simple_arp_packet(arp_op=1, eth_dst=ROUTER_MAC, pktlen=100) print('Sending unicast ARP request to router MAC') send_packet(self, self.dev_port24, arp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(arp_pkt, self.hif_socket)) print("\tOK") # Unicast ARP Request to other MAC address arp_pkt = simple_arp_packet(arp_op=1, eth_dst="00:AA:BB:CC:DD:EE", pktlen=100) print('Sending unicast ARP request to other MAC') send_packet(self, self.dev_port24, arp_pkt) verify_no_other_packets(self) self.assertFalse(socket_verify_packet(arp_pkt, self.hif_socket)) print("\tOK") # Broadcast ARP Response arp_pkt = simple_arp_packet(arp_op=2, pktlen=100) print("Sending broadcast ARP response") send_packet(self, self.dev_port24, arp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(arp_pkt, self.hif_socket)) print("\tOK") # Unicast ARP Response arp_pkt = simple_arp_packet(arp_op=2, eth_dst=ROUTER_MAC, pktlen=100) print('Sending unicast ARP response to router MAC') send_packet(self, self.dev_port24, arp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(arp_pkt, self.hif_socket)) print("\tOK") # Unicast ARP Response to other MAC address arp_pkt = simple_arp_packet(arp_op=2, eth_dst="00:AA:BB:CC:DD:EE", pktlen=100) print('Sending unicast ARP response to other MAC') send_packet(self, self.dev_port24, arp_pkt) verify_no_other_packets(self) self.assertFalse(socket_verify_packet(arp_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 4) print("OK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, arp_req_trap) sai_thrift_remove_hostif_trap(self.client, arp_resp_trap) def tearDown(self): super(arpTrapTest, self).tearDown() @group("draft") class ttlErrorTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type TTL ''' def setUp(self): super(ttlErrorTrapTest, self).setUp() def runTest(self): print("\nttlErrorTrapTest()") try: ttl_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_DROP, trap_type=SAI_HOSTIF_TRAP_TYPE_TTL_ERROR) self.assertNotEqual(ttl_trap, 0) pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.eport_ip, ip_ttl=64) exp_pkt = simple_udp_packet(eth_src=ROUTER_MAC, eth_dst=self.eport_mac, ip_dst=self.eport_ip, ip_ttl=63) print("Sending packet with TTL 64 - routed") send_packet(self, self.dev_port10, pkt) verify_packets(self, exp_pkt, [self.dev_port11]) print("\tOK") pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.eport_ip, ip_ttl=0) print("Sending packet with TTL 0 - dropped") send_packet(self, self.dev_port10, pkt) verify_no_other_packets(self) print("\tOK") pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.eport_ip, ip_ttl=1) print("Sending packet with TTL 1 - routed and dropped") send_packet(self, self.dev_port10, pkt) verify_no_other_packets(self) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, ttl_trap) def tearDown(self): super(ttlErrorTrapTest, self).tearDown() @group("draft") class bgpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type BGP ''' def setUp(self): super(bgpTrapTest, self).setUp() def runTest(self): print("\nbgpTrapTest()") bgp_port = 179 try: bgp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_BGP) self.assertNotEqual(bgp_trap, 0) bgpv6_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_BGPV6) self.assertNotEqual(bgpv6_trap, 0) # BGP v4 - dst BGP port bgp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, ip_dst=self.my_ip, tcp_dport=bgp_port) print("Sending BGP v4 packet - dst TCP port %d" % bgp_port) send_packet(self, self.dev_port10, bgp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(bgp_pkt, self.hif_socket2)) print("\tOK") # BGP v6 - dst BGP port bgpv6_pkt = simple_tcpv6_packet(eth_dst=ROUTER_MAC, ipv6_dst=self.my_ipv6, tcp_dport=bgp_port) print("Sending BGP v6 packet - dst TCP port %d" % bgp_port) send_packet(self, self.dev_port10, bgpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(bgpv6_pkt, self.hif_socket2)) print("\tOK") # BGP v4 - src BGP port bgp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, ip_dst=self.my_ip, tcp_sport=bgp_port) print("Sending BGP v4 packet - src TCP port %d" % bgp_port) send_packet(self, self.dev_port10, bgp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(bgp_pkt, self.hif_socket2)) print("\tOK") # BGP v6 - dst BGP port bgpv6_pkt = simple_tcpv6_packet(eth_dst=ROUTER_MAC, ipv6_dst=self.my_ipv6, tcp_sport=bgp_port) print("Sending BGP v6 packet - dst TCP port %d" % bgp_port) send_packet(self, self.dev_port10, bgpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(bgpv6_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 4) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, bgp_trap) sai_thrift_remove_hostif_trap(self.client, bgpv6_trap) def tearDown(self): super(bgpTrapTest, self).tearDown() @group("draft") class dhcpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type DHCP ''' def setUp(self): super(dhcpTrapTest, self).setUp() def runTest(self): print("\ndhcpTrapTest()") dhcp_sport = 67 dhcp_cport = 68 bcast_ip = "255.255.255.255" bcast_mac = "ff:ff:ff:ff:ff:ff" try: dhcp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCP) self.assertNotEqual(dhcp_trap, 0) # DHCP request dhcp_pkt = simple_udp_packet(eth_dst=bcast_mac, ip_dst=bcast_ip, udp_sport=dhcp_sport, udp_dport=dhcp_cport) print("Sending DHCP request packet") send_packet(self, self.dev_port10, dhcp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(dhcp_pkt, self.hif_socket2)) print("\tOK") # DHCP ack dhcp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.my_ip, udp_sport=dhcp_cport, udp_dport=dhcp_sport) print("Sending DHCP ack packet") send_packet(self, self.dev_port10, dhcp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(dhcp_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 2) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, dhcp_trap) def tearDown(self): super(dhcpTrapTest, self).tearDown() @group("draft") class ip2meTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type IP2ME ''' def setUp(self): super(ip2meTrapTest, self).setUp() def runTest(self): print("\nip2meTrapTest()") try: myip_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IP2ME) self.assertNotEqual(myip_trap, 0) pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.my_ip) print("Sending IP2ME packet") send_packet(self, self.dev_port24, pkt) self.assertTrue(socket_verify_packet(pkt, self.hif_socket)) send_packet(self, self.dev_port10, pkt) self.assertTrue(socket_verify_packet(pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 2) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, myip_trap) def tearDown(self): super(ip2meTrapTest, self).tearDown() @group("draft") class lacpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type LACP ''' def setUp(self): super(lacpTrapTest, self).setUp() def runTest(self): print("\nlacpTrapTest()") lacp_mac = "01:80:C2:00:00:02" try: lacp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LACP) self.assertNotEqual(lacp_trap, 0) lacp_pkt = simple_eth_packet(eth_dst=lacp_mac, pktlen=100) print("Sending LACP packet on SVI") send_packet(self, self.dev_port24, lacp_pkt) self.assertTrue(socket_verify_packet(lacp_pkt, self.hif_socket)) print("Sending LACP packet on RIF") send_packet(self, self.dev_port10, lacp_pkt) self.assertTrue(socket_verify_packet(lacp_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 2) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, lacp_trap) def tearDown(self): super(lacpTrapTest, self).tearDown() @group("draft") class lldpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type LLDP ''' def setUp(self): super(lldpTrapTest, self).setUp() def runTest(self): print("\nlldpTrapTest()") lldp_mac1 = "01:80:C2:00:00:0e" lldp_mac2 = "01:80:C2:00:00:03" try: lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertNotEqual(lldp_trap, 0) # LLDP DMAC=01:80:C2:00:00:0e lldp_pkt = simple_eth_packet(eth_dst=lldp_mac1, pktlen=100, eth_type=0x88cc) print("Sending LLDP packet with DMAC %s" % lldp_mac1) send_packet(self, self.dev_port24, lldp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(lldp_pkt, self.hif_socket)) send_packet(self, self.dev_port10, lldp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(lldp_pkt, self.hif_socket2)) print("\tOK") # LLDP DMAC=01:80:C2:00:00:03 lldp_pkt = simple_eth_packet(eth_dst=lldp_mac2, pktlen=100, eth_type=0x88cc) print("Sending LLDP packet with DMAC %s" % lldp_mac2) send_packet(self, self.dev_port24, lldp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(lldp_pkt, self.hif_socket)) send_packet(self, self.dev_port10, lldp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(lldp_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 4) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, lldp_trap) def tearDown(self): super(lldpTrapTest, self).tearDown() @group("draft") class ospfTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type OSPF ''' def setUp(self): super(ospfTrapTest, self).setUp() def runTest(self): print("\nospfTrapTest()") ospf_hello_ip = "224.0.0.5" ospf_dr_ip = "224.0.0.6" proto = 89 try: ospf_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_OSPF) self.assertNotEqual(ospf_trap, 0) # OSPF Hello ospf_pkt = simple_ip_packet(ip_dst=ospf_hello_ip, ip_proto=proto) print("Sending OSPF packet destined to %s" % ospf_hello_ip) send_packet(self, self.dev_port24, ospf_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(ospf_pkt, self.hif_socket)) send_packet(self, self.dev_port10, ospf_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(ospf_pkt, self.hif_socket2)) print("\tOK") # OSPF Designated Routers ospf_pkt = simple_ip_packet(ip_dst=ospf_dr_ip, ip_proto=proto) print("Sending OSPF packet destined to %s" % ospf_dr_ip) send_packet(self, self.dev_port24, ospf_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(ospf_pkt, self.hif_socket)) send_packet(self, self.dev_port10, ospf_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(ospf_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 4) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, ospf_trap) def tearDown(self): super(ospfTrapTest, self).tearDown() @group("draft") class igmpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of different IGMP types ''' def setUp(self): super(igmpTrapTest, self).setUp() def runTest(self): print("\nigmTrapTest()") trap_info_list = [ [SAI_HOSTIF_TRAP_TYPE_IGMP_TYPE_QUERY, 0x11, "Query"], [SAI_HOSTIF_TRAP_TYPE_IGMP_TYPE_LEAVE, 0x17, "Leave"], [SAI_HOSTIF_TRAP_TYPE_IGMP_TYPE_V1_REPORT, 0x12, "V1 Report"], [SAI_HOSTIF_TRAP_TYPE_IGMP_TYPE_V2_REPORT, 0x16, "V2 Report"], [SAI_HOSTIF_TRAP_TYPE_IGMP_TYPE_V3_REPORT, 0x22, "V3 Report"]] try: # enable VLAN100 igmp snooping status = sai_thrift_set_vlan_attribute( self.client, self.vlan100, custom_igmp_snooping_enable=True) self.assertEqual(status, SAI_STATUS_SUCCESS) for trap_info in trap_info_list: try: igmp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=trap_info[0]) self.assertNotEqual(igmp_trap, 0) igmp_pkt = simple_igmp_packet(igmp_type=trap_info[1]) print("Sending IGMP {} packet".format(trap_info[2])) send_packet(self, self.dev_port24, igmp_pkt) self.assertTrue(socket_verify_packet(igmp_pkt, self.hif_socket)) print("\tOK") finally: sai_thrift_remove_hostif_trap(self.client, igmp_trap) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + len(trap_info_list)) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_set_vlan_attribute( self.client, self.vlan100, custom_igmp_snooping_enable=False) def tearDown(self): super(igmpTrapTest, self).tearDown() @group("draft") class ipv6MldTrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type IPV6 MLD ''' def setUp(self): super(ipv6MldTrapTest, self).setUp() def runTest(self): print("\nIpv6MldTrapTest()") try: # enable VLAN100 igmp snooping sai_thrift_set_vlan_attribute( self.client, self.vlan100, custom_igmp_snooping_enable=True) ipv6_mld_trap_v1_v2 = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IPV6_MLD_V1_V2) self.assertTrue(ipv6_mld_trap_v1_v2 != 0) ipv6_mld_trap_v1_report = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IPV6_MLD_V1_REPORT) self.assertTrue(ipv6_mld_trap_v1_report != 0) ipv6_mld_trap_v1_done = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IPV6_MLD_V1_DONE) self.assertTrue(ipv6_mld_trap_v1_done != 0) ipv6_mld_trap_v2_report = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_MLD_V2_REPORT) self.assertTrue(ipv6_mld_trap_v2_report != 0) # V1 V2 Multicast listner Query mld_type = 130 ipv6_mld_v1v2_pkt = simple_icmpv6_packet(icmp_type=mld_type) print("Sending IPv6 MLD V1 V2 Query packet") send_packet(self, self.dev_port24, ipv6_mld_v1v2_pkt) self.assertTrue(socket_verify_packet(ipv6_mld_v1v2_pkt, self.hif_socket)) print("\tOK") # V1 Multicast Listener Report mld_type = 131 ipv6_mld_v1_report_pkt = simple_icmpv6_packet(icmp_type=mld_type) print("Sending IPv6 MLD V1 Report packet") send_packet(self, self.dev_port24, ipv6_mld_v1_report_pkt) self.assertTrue(socket_verify_packet(ipv6_mld_v1_report_pkt, self.hif_socket)) print("\tOK") # V1 Multicast Listener Done mld_type = 132 ipv6_mld_v1_done_pkt = simple_icmpv6_packet(icmp_type=mld_type) print("Sending IPv6 MLD V1 Done packet") send_packet(self, self.dev_port24, ipv6_mld_v1_done_pkt) self.assertTrue(socket_verify_packet(ipv6_mld_v1_done_pkt, self.hif_socket)) print("\tOK") # V2 Multicast Listener Report mld_type = 143 mld_v2_report_pkt = simple_icmpv6_packet(icmp_type=mld_type) print("Sending IPv6 MLD V2 Report packet") send_packet(self, self.dev_port24, mld_v2_report_pkt) self.assertTrue(socket_verify_packet(mld_v2_report_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 4) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_set_vlan_attribute( self.client, self.vlan100, custom_igmp_snooping_enable=False) sai_thrift_remove_hostif_trap(self.client, ipv6_mld_trap_v1_v2) sai_thrift_remove_hostif_trap(self.client, ipv6_mld_trap_v1_report) sai_thrift_remove_hostif_trap(self.client, ipv6_mld_trap_v1_done) sai_thrift_remove_hostif_trap(self.client, ipv6_mld_trap_v2_report) def tearDown(self): super(ipv6MldTrapTest, self).tearDown() @group("draft") class stpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type STP ''' def setUp(self): super(stpTrapTest, self).setUp() def runTest(self): print("\nstpTrapTest()") stp_mac = "01:80:C2:00:00:00" try: stp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_STP) self.assertNotEqual(stp_trap, 0) stp_pkt = simple_eth_packet(eth_dst=stp_mac, pktlen=100) print("Sending STP packet") send_packet(self, self.dev_port24, stp_pkt) self.assertTrue(socket_verify_packet(stp_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, stp_trap) def tearDown(self): super(stpTrapTest, self).tearDown() @group("draft") class pvrstTrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type PVRST ''' def setUp(self): super(pvrstTrapTest, self).setUp() def runTest(self): print("\npvrstTrapTest()") pvrst_mac = "01:00:0C:CC:CC:CD" try: pvrst_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_PVRST) self.assertTrue(pvrst_trap != 0) pvrst_pkt = simple_eth_packet(eth_dst=pvrst_mac, pktlen=100) print("Sending PVRST packet") send_packet(self, self.dev_port24, pvrst_pkt) self.assertTrue(socket_verify_packet(pvrst_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, pvrst_trap) def tearDown(self): super(pvrstTrapTest, self).tearDown() @group("draft") class pimTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type PIM ''' def setUp(self): super(pimTrapTest, self).setUp() def runTest(self): print("\npimTrapTest()") proto = 103 try: pim_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_PIM) self.assertNotEqual(pim_trap, 0) pim_pkt = simple_ip_packet(ip_proto=proto) print("Sending PIM packet") send_packet(self, self.dev_port24, pim_pkt) self.assertTrue(socket_verify_packet(pim_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, pim_trap) def tearDown(self): super(pimTrapTest, self).tearDown() @group("draft") class udldTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type UDLD ''' def setUp(self): super(udldTrapTest, self).setUp() def runTest(self): print("\nudldTrapTest()") udld_mac = "01:00:0C:CC:CC:CC" try: udld_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_UDLD) self.assertNotEqual(udld_trap, 0) udld_pkt = simple_eth_packet(eth_dst=udld_mac, pktlen=100) print("Sending UDLD packet") send_packet(self, self.dev_port24, udld_pkt) self.assertTrue(socket_verify_packet(udld_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, udld_trap) def tearDown(self): super(udldTrapTest, self).tearDown() @group("draft") class ipv6NDTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type ICMP neighbor discovery ''' def setUp(self): super(ipv6NDTrapTest, self).setUp() def runTest(self): print("\nipv6NDTrapTest()") try: src_ipv6 = "2001:0db8::1111" dst_ipv6 = "2001:0db8::2222" icmpv6_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IPV6_NEIGHBOR_DISCOVERY) self.assertNotEqual(icmpv6_trap, 0) # IPv6 Router Solicitation all_rt_ipv6 = "ff02::2" rs_type = 133 icmpv6_pkt = simple_icmpv6_packet(ipv6_dst=all_rt_ipv6, ipv6_src=src_ipv6, icmp_type=rs_type) print("Sending IPv6 Router Solicitation packet") send_packet(self, self.dev_port24, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket)) send_packet(self, self.dev_port10, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket2)) print("\tOK") # IPv6 Router Advertisement all_nodes_ipv6 = "ff02::1" ra_type = 134 icmpv6_pkt = simple_icmpv6_packet(ipv6_dst=all_nodes_ipv6, ipv6_src=src_ipv6, icmp_type=ra_type) print("Sending IPv6 Router Advertisement packet") send_packet(self, self.dev_port24, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket)) send_packet(self, self.dev_port10, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket2)) print("\tOK") # IPv6 Neighbor Solicitation ns_ipv6 = "ff02::1:ff11:777F" ns_type = 135 icmpv6_pkt = simple_icmpv6_packet(ipv6_dst=ns_ipv6, ipv6_src=src_ipv6, icmp_type=ns_type) print("Sending IPv6 Neighbor Solicitation packet") send_packet(self, self.dev_port24, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket)) send_packet(self, self.dev_port10, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket2)) print("\tOK") # IPv6 Neighbor Advertisement na_type = 136 icmpv6_pkt = simple_icmpv6_packet(ipv6_dst=dst_ipv6, ipv6_src=src_ipv6, icmp_type=na_type) print("Sending IPv6 Neighbor Advertisement packet") send_packet(self, self.dev_port24, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket)) send_packet(self, self.dev_port10, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket2)) print("\tOK") # IPv6 Redirect Message red_type = 137 icmpv6_pkt = simple_icmpv6_packet(ipv6_dst=dst_ipv6, ipv6_src=src_ipv6, icmp_type=red_type) print("Sending IPv6 Redirect Message packet") send_packet(self, self.dev_port24, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket)) send_packet(self, self.dev_port10, icmpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(icmpv6_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 10) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, icmpv6_trap) def tearDown(self): super(ipv6NDTrapTest, self).tearDown() @group("draft") class bfdRxTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type BFD RX ''' def setUp(self): super(bfdRxTrapTest, self).setUp() def runTest(self): print("\nbfdRxTrapTest()") bfd_port = 3784 try: bfd_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_BFD) self.assertNotEqual(bfd_trap, 0) bfd_pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.my_ip, udp_dport=bfd_port) print("Sending BFDv4 packet") send_packet(self, self.dev_port24, bfd_pkt) self.assertTrue(socket_verify_packet(bfd_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, bfd_trap) def tearDown(self): super(bfdRxTrapTest, self).tearDown() @group("draft") class dhcpv6TrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type DHCPv6 ''' def setUp(self): super(dhcpv6TrapTest, self).setUp() def runTest(self): print("\ndhcpv6TrapTest()") dhcpv6_client_port = 546 dhcpv6_srv_port = 547 dhcpv6_pkt_params_list = [ ["33:33:00:01:00:02", "FF02::1:2", dhcpv6_client_port, dhcpv6_srv_port, "all servers and relay agents"], ["33:33:00:01:00:05", "FF05::1:3", dhcpv6_client_port, dhcpv6_srv_port, "all servers"], [ROUTER_MAC, self.my_ipv6, dhcpv6_client_port, dhcpv6_srv_port, "ip-to-me server"], [ROUTER_MAC, self.my_ipv6, dhcpv6_srv_port, dhcpv6_client_port, "ip-to-me client"]] try: dhcpv6_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCPV6) self.assertNotEqual(dhcpv6_trap, 0) for dhcpv6_pkt_params in dhcpv6_pkt_params_list: dhcpv6_pkt = simple_udpv6_packet( eth_dst=dhcpv6_pkt_params[0], ipv6_dst=dhcpv6_pkt_params[1], udp_sport=dhcpv6_pkt_params[2], udp_dport=dhcpv6_pkt_params[3]) print("Sending DHCPv6 \'{}\' packet". format(dhcpv6_pkt_params[4])) send_packet(self, self.dev_port24, dhcpv6_pkt) self.assertTrue(socket_verify_packet(dhcpv6_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + len(dhcpv6_pkt_params_list)) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, dhcpv6_trap) def tearDown(self): super(dhcpv6TrapTest, self).tearDown() @group("draft") class ptpTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type PTP ''' def setUp(self): super(ptpTrapTest, self).setUp() def runTest(self): print("\nptpTrapTest()") ptp_port1 = 319 ptp_port2 = 320 try: ptp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_PTP) self.assertNotEqual(ptp_trap, 0) ptp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.iport_ip, udp_dport=ptp_port1) print("Sending PTP packet to UDP port %d" % ptp_port1) send_packet(self, self.dev_port24, ptp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(ptp_pkt, self.hif_socket)) print("\tOK") ptp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.iport_ip, udp_dport=ptp_port2) print("Sending PTP packet to UDP port %d" % ptp_port2) send_packet(self, self.dev_port24, ptp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(ptp_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 2) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, ptp_trap) def tearDown(self): super(ptpTrapTest, self).tearDown() @group("draft") class isisTrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type ISIS ''' def setUp(self): super(isisTrapTest, self).setUp() def runTest(self): print("\nisisTrapTest()") try: isis_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_ISIS) self.assertTrue(isis_trap != 0) isis_mac = ["01:80:c2:00:00:14", "01:80:c2:00:00:15", "09:00:2b:00:00:05"] for mac in isis_mac: isis_pkt = simple_eth_packet(eth_dst=mac, pktlen=100) print("Sending ISIS packet - %s" % mac) send_packet(self, self.dev_port24, isis_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(isis_pkt, self.hif_socket)) send_packet(self, self.dev_port10, isis_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(isis_pkt, self.hif_socket2)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 6) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, isis_trap) def tearDown(self): super(isisTrapTest, self).tearDown() @group("draft") class bfdv6RxTrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type BFDV6 RX ''' def setUp(self): super(bfdv6RxTrapTest, self).setUp() def runTest(self): print("\nbfdv6RxTrapTest()") bfd_port = 3784 try: bfdv6_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_BFDV6) self.assertTrue(bfdv6_trap != 0) bfdv6_pkt = simple_udpv6_packet(eth_dst=ROUTER_MAC, ipv6_dst=self.my_ipv6, udp_dport=bfd_port) print("Sending BFDv6 packet") send_packet(self, self.dev_port24, bfdv6_pkt) self.assertTrue(socket_verify_packet(bfdv6_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, bfdv6_trap) def tearDown(self): super(bfdv6RxTrapTest, self).tearDown() @group("draft") class dhcpL2TrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type DHCP L2 ''' def setUp(self): super(dhcpL2TrapTest, self).setUp() def runTest(self): print("\ndhcpL2TrapTest()") dhcp_sport = 67 dhcp_cport = 68 bcast_ip = "255.255.255.255" bcast_mac = "ff:ff:ff:ff:ff:ff" try: dhcp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCP_L2) self.assertTrue(dhcp_trap != 0) # DHCP request dhcp_pkt = simple_udp_packet(eth_dst=bcast_mac, ip_dst=bcast_ip, udp_sport=dhcp_sport, udp_dport=dhcp_cport) print("Sending DHCP request packet") send_packet(self, self.dev_port24, dhcp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(dhcp_pkt, self.hif_socket)) print("\tOK") # DHCP ack dhcp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC, ip_dst=self.my_ip, udp_sport=dhcp_cport, udp_dport=dhcp_sport) print("Sending DHCP ack packet") send_packet(self, self.dev_port24, dhcp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(dhcp_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 2) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, dhcp_trap) def tearDown(self): super(dhcpL2TrapTest, self).tearDown() @group("draft") class dhcpv6L2TrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type DHCPv6 L2 ''' def setUp(self): super(dhcpv6L2TrapTest, self).setUp() def runTest(self): print("\ndhcpv6L2TrapTest()") dhcpv6_client_port = 546 dhcpv6_srv_port = 547 dhcpv6_pkt_params_list = [ ["33:33:00:01:00:02", "FF02::1:2", dhcpv6_client_port, dhcpv6_srv_port, "all servers and relay agents"], ["33:33:00:01:00:05", "FF05::1:3", dhcpv6_client_port, dhcpv6_srv_port, "all servers"], [ROUTER_MAC, self.my_ipv6, dhcpv6_client_port, dhcpv6_srv_port, "ip-to-me server"], [ROUTER_MAC, self.my_ipv6, dhcpv6_srv_port, dhcpv6_client_port, "ip-to-me client"] ] try: dhcpv6_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCPV6_L2) self.assertTrue(dhcpv6_trap != 0) for dhcpv6_pkt_params in dhcpv6_pkt_params_list: dhcpv6_pkt = simple_udpv6_packet( eth_dst=dhcpv6_pkt_params[0], ipv6_dst=dhcpv6_pkt_params[1], udp_sport=dhcpv6_pkt_params[2], udp_dport=dhcpv6_pkt_params[3]) print("Sending DHCPv6 \'{}\' packet". format(dhcpv6_pkt_params[4])) send_packet(self, self.dev_port24, dhcpv6_pkt) self.assertTrue(socket_verify_packet(dhcpv6_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + len(dhcpv6_pkt_params_list)) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, dhcpv6_trap) def tearDown(self): super(dhcpv6L2TrapTest, self).tearDown() @group("draft") class vrrpTrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type VRRP ''' def setUp(self): super(vrrpTrapTest, self).setUp() def runTest(self): print("\nvrrpTrapTest()") vrrp_dst_ip = "224.0.0.18" proto = 112 try: vrrp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_VRRP) self.assertTrue(vrrp_trap != 0) # VRRP pkt vrrp_pkt = simple_ip_packet(ip_dst=vrrp_dst_ip, ip_proto=proto) print("Sending VRRP packet destined to %s" % vrrp_dst_ip) send_packet(self, self.dev_port24, vrrp_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(vrrp_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, vrrp_trap) def tearDown(self): super(vrrpTrapTest, self).tearDown() @group("draft") class vrrpv6TrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type VRRPV6 ''' def setUp(self): super(vrrpv6TrapTest, self).setUp() def runTest(self): print("\nvrrpv6TrapTest()") vrrpv6_dst_ip = "FF02::12" proto = 112 try: vrrpv6_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_COPY, trap_type=SAI_HOSTIF_TRAP_TYPE_VRRPV6) self.assertTrue(vrrpv6_trap != 0) # VRRPV6 pkt vrrpv6_pkt = simple_ipv6ip_packet(ipv6_dst=vrrpv6_dst_ip) vrrpv6_pkt["IPv6"].nh = proto print("Sending VRRPV6 packet destined to %s" % vrrpv6_dst_ip) send_packet(self, self.dev_port24, vrrpv6_pkt) # CPU queue++ self.assertTrue(socket_verify_packet(vrrpv6_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, vrrpv6_trap) def tearDown(self): super(vrrpv6TrapTest, self).tearDown() @group("draft") class eapolTrapTest(HostifTrapTypesTestHelper): ''' This verifies trap of type EAPOL ''' def setUp(self): super(eapolTrapTest, self).setUp() def runTest(self): print("\neapolTrapTest()") eapol_etype = 0x888e try: eapol_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_EAPOL) self.assertTrue(eapol_trap != 0) eapol_pkt = simple_eth_packet(eth_type=eapol_etype, pktlen=100) print("Sending EAPOL packet") send_packet(self, self.dev_port24, eapol_pkt) self.assertTrue(socket_verify_packet(eapol_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], self.cpu_queue_state + 1) print("\tOK") self.cpu_queue_state = post_stats["SAI_QUEUE_STAT_PACKETS"] finally: sai_thrift_remove_hostif_trap(self.client, eapol_trap) def tearDown(self): super(eapolTrapTest, self).tearDown() @group("draft") class mplsRouterAlertTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type MPLS router alert ''' def setUp(self): super(mplsRouterAlertTrapTest, self).setUp() def runTest(self): print("\nmplsRouterAlertTrapTest()") mpls_tag = {'label': 1, 'ttl': 63, 'tc': 0, 's': 0} mpls_tag_2 = {'label': 5555, 'ttl': 63, 'tc': 0, 's': 1} try: mpls_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_MPLS_ROUTER_ALERT_LABEL) self.assertNotEqual(mpls_trap, 0) inner_pkt = simple_udp_packet(eth_dst=ROUTER_MAC) mpls_pkt = simple_mpls_packet(eth_dst=ROUTER_MAC, mpls_tags=[mpls_tag, mpls_tag_2], inner_frame=inner_pkt[IP]) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Tx MPLS packet with Router Alert Label 1 ") send_packet(self, self.iport_dev, mpls_pkt) self.assertTrue(socket_verify_packet(mpls_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 1) finally: self.dataplane.flush() sai_thrift_remove_hostif_trap(self.client, mpls_trap) def tearDown(self): super(mplsRouterAlertTrapTest, self).tearDown() @group("draft") class mplsTtlErrorTrapTest(HostifTrapTypesTestHelper): ''' Verify trap of type MPLS TTL error ''' def setUp(self): super(mplsTtlErrorTrapTest, self).setUp() def runTest(self): print("\nmplsTtlErrorTrapTest()") mpls_tag = {'label': 1000, 'ttl': 1, 'tc': 0, 's': 0} mpls_tag_ttl_0 = {'label': 1000, 'ttl': 0, 'tc': 0, 's': 0} mpls_tag_ttl_2 = {'label': 1000, 'ttl': 2, 'tc': 0, 's': 0} try: mpls_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_MPLS_TTL_ERROR) self.assertNotEqual(mpls_trap, 0) inner_pkt = simple_udp_packet(eth_dst=ROUTER_MAC) mpls_pkt = simple_mpls_packet(eth_dst=ROUTER_MAC, mpls_tags=[mpls_tag], inner_frame=inner_pkt[IP]) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Tx MPLS packet with TTL=1 --> trap") send_packet(self, self.iport_dev, mpls_pkt) self.assertTrue(socket_verify_packet(mpls_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(5) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 1) mpls_pkt = simple_mpls_packet(eth_dst=ROUTER_MAC, mpls_tags=[mpls_tag_ttl_0], inner_frame=inner_pkt[IP]) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Tx MPLS packet with TTL=0 --> trap") send_packet(self, self.iport_dev, mpls_pkt) self.assertTrue(socket_verify_packet(mpls_pkt, self.hif_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(5) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 1) mpls_pkt = simple_mpls_packet(eth_dst=ROUTER_MAC, mpls_tags=[mpls_tag_ttl_2], inner_frame=inner_pkt[IP]) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Tx MPLS packet with TTL=2 --> dp") send_packet(self, self.iport_dev, mpls_pkt) self.assertFalse(socket_verify_packet(mpls_pkt, self.hif_socket)) print("\tOK") finally: self.dataplane.flush() sai_thrift_remove_hostif_trap(self.client, mpls_trap) def tearDown(self): super(mplsTtlErrorTrapTest, self).tearDown() @group("draft") class HostifUserDefinedTrapACLTestHelper(PlatformSaiHelper): """ Verify hostif user defined traps """ def setUp(self): super(HostifUserDefinedTrapACLTestHelper, self).setUp() self.hostifs = [] self.hostif1_name = "hostif1" self.hostif2_name = "hostif2" self.udt_traps = sai_thrift_query_attribute_enum_values_capability( self.client, SAI_OBJECT_TYPE_HOSTIF_USER_DEFINED_TRAP, SAI_HOSTIF_USER_DEFINED_TRAP_ATTR_TYPE) self.neigh_actions = sai_thrift_query_attribute_enum_values_capability( self.client, SAI_OBJECT_TYPE_NEIGHBOR_ENTRY, SAI_NEIGHBOR_ENTRY_ATTR_PACKET_ACTION) port = self.port0 self.test_dev_port = self.dev_port0 # create hostifs # these hostif will be used for receiving trapped packets # hostif1 will receive packets from user trap 1 # hostif2 will receive packets from user trap 2 self.hostif1 = sai_thrift_create_hostif(self.client, name=self.hostif1_name, obj_id=port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(self.hostif1, 0) self.hostifs.append(self.hostif1) self.hostif2 = sai_thrift_create_hostif(self.client, name=self.hostif2_name, obj_id=port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertNotEqual(self.hostif2, 0) self.hostifs.append(self.hostif2) time.sleep(6) self.hostif1_socket = open_packet_socket(self.hostif1_name) self.hostif2_socket = open_packet_socket(self.hostif2_name) def tearDown(self): self.hostif1_socket.close() self.hostif2_socket.close() while self.hostifs: sai_thrift_remove_hostif(self.client, self.hostifs.pop()) super(HostifUserDefinedTrapACLTestHelper, self).tearDown() @group("draft") class aclIpSrcNetdevTrapTest(HostifUserDefinedTrapACLTestHelper): ''' Verify the traffic matching ACL rule associated with user defined trap is properly trapped and directed to the relevant host interface 1. create two host interfaces 2. create two user defined traps 3. create hostif table entries to route trap 1 packets to host interface 1 and trap 2 packets to host interface 2 4. create ACL table with ACL enties matching different traffic for each user defined trap 5. send packets matching the ACL entries and verify them arrived to expected host interfaces ''' def setUp(self): super(aclIpSrcNetdevTrapTest, self).setUp() def runTest(self): print("\naclIpSrcNetdevTrapTest()") # test will create ACL rules to match these addresses and # trap mathing packets src_ip1 = '10.0.0.1' src_ip1_mask = '255.255.255.255' src_ip2 = '10.0.0.2' src_ip2_mask = '255.255.255.255' # create packet to be trapped by user defined trap 1 pkt1 = simple_ip_packet(ip_src=src_ip1) # create packet to be trapped by user defined trap 2 pkt2 = simple_ip_packet(ip_src=src_ip2) table_stage = SAI_ACL_STAGE_INGRESS table_bind_points = [SAI_ACL_BIND_POINT_TYPE_SWITCH] table_bind_point_type_list = sai_thrift_s32_list_t( count=len(table_bind_points), int32list=table_bind_points) udts = [] hostif_table_entries = [] acl_tables = [] acl_entries = [] try: # create user defined traps udt1 = sai_thrift_create_hostif_user_defined_trap( self.client, type=SAI_HOSTIF_USER_DEFINED_TRAP_TYPE_ACL) self.assertTrue(udt1 != 0) udts.append(udt1) udt2 = sai_thrift_create_hostif_user_defined_trap( self.client, type=SAI_HOSTIF_USER_DEFINED_TRAP_TYPE_ACL) self.assertTrue(udt2 != 0) udts.append(udt2) # associate user defined trap 1 with hostif 1 channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_PHYSICAL_PORT hostif_table_entry_udt1_hif1 = \ sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, host_if=self.hostif1, trap_id=udt1, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_TRAP_ID) self.assertTrue(hostif_table_entry_udt1_hif1 != 0) hostif_table_entries.append(hostif_table_entry_udt1_hif1) # associate user defined trap 2 with hostif 2 channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_PHYSICAL_PORT hostif_table_entry_udt2_hif2 = \ sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, host_if=self.hostif2, trap_id=udt2, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_TRAP_ID) self.assertTrue(hostif_table_entry_udt2_hif2 != 0) hostif_table_entries.append(hostif_table_entry_udt2_hif2) # create ACL table acl_table = sai_thrift_create_acl_table( self.client, acl_stage=table_stage, acl_bind_point_type_list=table_bind_point_type_list, field_src_ip=True) self.assertTrue(acl_table != 0) acl_tables.append(acl_table) # create ACL table entry and associate it with user defined trap 1 src_ip_t = sai_thrift_acl_field_data_t( data=sai_thrift_acl_field_data_data_t(ip4=src_ip1), mask=sai_thrift_acl_field_data_mask_t(ip4=src_ip1_mask)) set_user_trap_id = sai_thrift_acl_action_data_t( parameter=sai_thrift_acl_action_parameter_t(oid=udt1)) packet_action = sai_thrift_acl_action_data_t( parameter=sai_thrift_acl_action_parameter_t( s32=SAI_PACKET_ACTION_TRAP)) acl_entry1 = sai_thrift_create_acl_entry( self.client, table_id=acl_table, priority=10, field_src_ip=src_ip_t, action_set_user_trap_id=set_user_trap_id, action_packet_action=packet_action) self.assertTrue(acl_entry1 != 0) acl_entries.append(acl_entry1) # create ACL table entry and associate it with user defined trap 2 src_ip_t = sai_thrift_acl_field_data_t( data=sai_thrift_acl_field_data_data_t(ip4=src_ip2), mask=sai_thrift_acl_field_data_mask_t(ip4=src_ip2_mask)) set_user_trap_id = sai_thrift_acl_action_data_t( parameter=sai_thrift_acl_action_parameter_t(oid=udt2)) packet_action = sai_thrift_acl_action_data_t( parameter=sai_thrift_acl_action_parameter_t( s32=SAI_PACKET_ACTION_TRAP)) acl_entry2 = sai_thrift_create_acl_entry( self.client, table_id=acl_table, priority=11, field_src_ip=src_ip_t, action_set_user_trap_id=set_user_trap_id, action_packet_action=packet_action) self.assertTrue(acl_entry2 != 0) acl_entries.append(acl_entry2) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) # Send packets and verify they are properly trapped print("Sending IP packet 1") send_packet(self, self.test_dev_port, pkt1) # CPU queue++ print("Verifying IP packet 1 on port host interface 1") self.assertTrue(socket_verify_packet(pkt1, self.hostif1_socket)) print("Verifying no IP packet 1 on port host interface 2") self.assertTrue( not socket_verify_packet( pkt1, self.hostif2_socket, timeout=2)) print("Sending IP packet 2") send_packet(self, self.test_dev_port, pkt2) # CPU queue++ print("Verifying no IP packet 2 on port host interface 1") self.assertTrue( not socket_verify_packet( pkt2, self.hostif1_socket, timeout=2)) print("Verifying IP packet 2 on port host interface 2") self.assertTrue(socket_verify_packet(pkt2, self.hostif2_socket)) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 2) print("\tOK") finally: self.dataplane.flush() while acl_entries: sai_thrift_remove_acl_entry(self.client, acl_entries.pop()) while acl_tables: sai_thrift_remove_acl_table(self.client, acl_tables.pop()) while hostif_table_entries: sai_thrift_remove_hostif_table_entry( self.client, hostif_table_entries.pop()) while udts: sai_thrift_remove_hostif_user_defined_trap( self.client, udts.pop()) def tearDown(self): super(aclIpSrcNetdevTrapTest, self).tearDown() @group("draft") class trapPriorityTest(PlatformSaiHelper): """ Verify priority attribute getting """ def setUp(self): super(trapPriorityTest, self).setUp() def runTest(self): try: custom_priotiy_1 = 11 lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP, trap_priority=custom_priotiy_1) self.assertGreater(lldp_trap, 0) attrs = sai_thrift_get_hostif_trap_attribute( self.client, lldp_trap, trap_priority=True) self.assertEqual(attrs["trap_priority"], custom_priotiy_1) custom_priotiy_2 = 5 dhcp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCP, trap_priority=custom_priotiy_2) self.assertGreater(dhcp_trap, 0) attrs = sai_thrift_get_hostif_trap_attribute( self.client, dhcp_trap, trap_priority=True) self.assertEqual(attrs["trap_priority"], custom_priotiy_2) finally: sai_thrift_remove_hostif_trap(self.client, dhcp_trap) sai_thrift_remove_hostif_trap(self.client, lldp_trap) def tearDown(self): super(trapPriorityTest, self).tearDown() @group("draft") class trapTypeTest(PlatformSaiHelper): """ Verify type attribute getting """ def setUp(self): super(trapTypeTest, self).setUp() def runTest(self): try: lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertGreater(lldp_trap, 0) attrs = sai_thrift_get_hostif_trap_attribute( self.client, lldp_trap, trap_type=True) self.assertEqual(attrs["trap_type"], SAI_HOSTIF_TRAP_TYPE_LLDP) dhcp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCP) self.assertGreater(dhcp_trap, 0) attrs = sai_thrift_get_hostif_trap_attribute( self.client, dhcp_trap, trap_type=True) self.assertEqual(attrs["trap_type"], SAI_HOSTIF_TRAP_TYPE_DHCP) finally: sai_thrift_remove_hostif_trap(self.client, lldp_trap) sai_thrift_remove_hostif_trap(self.client, dhcp_trap) def tearDown(self): super(trapTypeTest, self).tearDown() @group("draft") class trapGroupTest(PlatformSaiHelper): """ Verify group attribute getting """ def setUp(self): super(trapGroupTest, self).setUp() def runTest(self): try: custom_trap_group = sai_thrift_create_hostif_trap_group( self.client, admin_state=True) self.assertGreater(custom_trap_group, 0) ip2me_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IP2ME) self.assertGreater(ip2me_trap, 0) attrs = sai_thrift_get_hostif_trap_attribute( self.client, ip2me_trap, trap_group=True) self.assertNotEqual(attrs["trap_group"], custom_trap_group) dhcp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCP, trap_group=custom_trap_group) attrs = sai_thrift_get_hostif_trap_attribute( self.client, dhcp_trap, trap_group=True) self.assertEqual(attrs["trap_group"], custom_trap_group) finally: sai_thrift_remove_hostif_trap(self.client, ip2me_trap) sai_thrift_remove_hostif_trap(self.client, dhcp_trap) sai_thrift_remove_hostif_trap_group(self.client, custom_trap_group) def tearDown(self): super(trapGroupTest, self).tearDown() @group("draft") class trapActionTest(PlatformSaiHelper): """ Verify action attribute getting """ def setUp(self): super(trapActionTest, self).setUp() def runTest(self): try: ip2me_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_IP2ME) self.assertGreater(ip2me_trap, 0) attrs = sai_thrift_get_hostif_trap_attribute( self.client, ip2me_trap, packet_action=True) self.assertEqual(attrs["packet_action"], SAI_PACKET_ACTION_TRAP) dhcp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_DROP, trap_type=SAI_HOSTIF_TRAP_TYPE_DHCP) attrs = sai_thrift_get_hostif_trap_attribute( self.client, dhcp_trap, packet_action=True) self.assertEqual(attrs["packet_action"], SAI_PACKET_ACTION_DROP) finally: sai_thrift_remove_hostif_trap(self.client, ip2me_trap) sai_thrift_remove_hostif_trap(self.client, dhcp_trap) def tearDown(self): super(trapActionTest, self).tearDown() @group("draft") class hostIfOperStatusTest(PlatformSaiHelper): ''' - This verifies the correctness of host interface creation with object type Port and setting the oper status. - Updating the oper status - The test verifies also if management packets are passed to ports host interfaces after hostif table wildcard entry creation. ''' def setUp(self): super(hostIfOperStatusTest, self).setUp() def runTest(self): print("\nhosIfOperStatusTest()") hostif1_port = self.port24 hostif1_dev_port = self.dev_port24 hostif1_name = "hostif1" hostif2_port = self.port25 hostif2_dev_port = self.dev_port25 hostif2_name = "hostif2" lldp_mac = "01:80:c2:00:00:0e" lldp_pkt = simple_eth_packet(eth_dst=lldp_mac, pktlen=100, eth_type=0x88cc) try: print("creating host interface with oper_status as False") hostif1 = sai_thrift_create_hostif(self.client, name=hostif1_name, obj_id=hostif1_port, oper_status=False, type=SAI_HOSTIF_TYPE_NETDEV) self.assertTrue(hostif1 != 0) hostif2 = sai_thrift_create_hostif(self.client, name=hostif2_name, obj_id=hostif2_port, oper_status=False, type=SAI_HOSTIF_TYPE_NETDEV) self.assertTrue(hostif2 != 0) hostif1_attr = sai_thrift_get_hostif_attribute(self.client, hostif1, oper_status=True) hostif2_attr = sai_thrift_get_hostif_attribute(self.client, hostif2, oper_status=True) print("Retrieving host interface staus:") print("hostif1_oper_status: ", hostif1_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) self.assertFalse(hostif1_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) print("hostif2_oper_status: ", hostif2_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) self.assertFalse(hostif2_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) print("Setting host interface with oper_status as True") sai_thrift_set_hostif_attribute(self.client, hostif1, oper_status=True) sai_thrift_set_hostif_attribute(self.client, hostif2, oper_status=True) hostif1_attr = sai_thrift_get_hostif_attribute(self.client, hostif1, oper_status=True) hostif2_attr = sai_thrift_get_hostif_attribute(self.client, hostif2, oper_status=True) print("Retrieving host interface staus:") print("hostif1_oper_status: ", hostif1_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) self.assertTrue(hostif1_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) print("hostif2_oper_status: ", hostif2_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) self.assertTrue(hostif2_attr['SAI_HOSTIF_ATTR_OPER_STATUS']) hif1_socket = open_packet_socket(hostif1_name) hif2_socket = open_packet_socket(hostif2_name) lldp_trap = sai_thrift_create_hostif_trap( self.client, packet_action=SAI_PACKET_ACTION_TRAP, trap_type=SAI_HOSTIF_TRAP_TYPE_LLDP) self.assertTrue(lldp_trap != 0) channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_PHYSICAL_PORT hif_tbl_entry = sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_WILDCARD) self.assertTrue(hif_tbl_entry != 0) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) print("Sending LLDP packet on port %d" % hostif1_dev_port) send_packet(self, hostif1_dev_port, lldp_pkt) print("Verifying LLDP packet on port host interface") self.assertTrue(socket_verify_packet(lldp_pkt, hif1_socket)) print("\tOK") print("Sending LLDP packet on port %d" % hostif2_dev_port) send_packet(self, hostif2_dev_port, lldp_pkt) print("Verifying LLDP packet on port host interface") self.assertTrue(socket_verify_packet(lldp_pkt, hif2_socket)) print("\tOK") print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + 2) print("\tOK") print("\tVerification complete") finally: sai_thrift_remove_hostif_table_entry(self.client, hif_tbl_entry) sai_thrift_remove_hostif_trap(self.client, lldp_trap) sai_thrift_remove_hostif(self.client, hostif1) sai_thrift_remove_hostif(self.client, hostif2) def tearDown(self): sai_thrift_flush_fdb_entries(self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL) super(hostIfOperStatusTest, self).tearDown() @group("draft") class HostifUserDefinedTrapNeighborTest(PlatformSaiHelper): ''' Test hostif user defined traps ''' def setUp(self): super(HostifUserDefinedTrapNeighborTest, self).setUp() self.hostifs = [] self.udt_traps = sai_thrift_query_attribute_enum_values_capability( self.client, SAI_OBJECT_TYPE_HOSTIF_USER_DEFINED_TRAP, SAI_HOSTIF_USER_DEFINED_TRAP_ATTR_TYPE) self.neigh_actions = sai_thrift_query_attribute_enum_values_capability( self.client, SAI_OBJECT_TYPE_NEIGHBOR_ENTRY, SAI_NEIGHBOR_ENTRY_ATTR_PACKET_ACTION) self.hostif1_name = "hostif1" self.hostif2_name = "hostif2" port = self.port24 self.test_dev_port = self.dev_port24 # create hostifs # these hostif will be used for receiving trapped packets # hostif1 will receive packets from user trap 1 # hostif2 will receive packets from user trap 2 self.hostif1 = sai_thrift_create_hostif(self.client, name=self.hostif1_name, obj_id=port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertTrue(self.hostif1 != 0) self.hostifs.append(self.hostif1) self.hostif2 = sai_thrift_create_hostif(self.client, name=self.hostif2_name, obj_id=port, type=SAI_HOSTIF_TYPE_NETDEV) self.assertTrue(self.hostif2 != 0) self.hostifs.append(self.hostif2) time.sleep(6) self.hostif1_socket = open_packet_socket(self.hostif1_name) self.hostif2_socket = open_packet_socket(self.hostif2_name) def tearDown(self): self.hostif1_socket.close() self.hostif2_socket.close() while self.hostifs: sai_thrift_remove_hostif(self.client, self.hostifs.pop()) super(HostifUserDefinedTrapNeighborTest, self).tearDown() @group("draft") class neighborTrapTest(HostifUserDefinedTrapNeighborTest): ''' Verify the Neighbor entry with packet action trap and user defined trap works correctly and directed to the relevant host interface 1. create two host interfaces 2. create two user defined traps 3. create hostif table entries to route trap 1 packets to host interface 1 and trap 2 packets to host interface 2 4. Setup regular L3 forwarding with route, nexthop and neighbor to forward traffic to two rifs 5. Verify regular L3 forwarding works 6. Delete regular nexthop (packet action forward) entries and create Neighbor entries with packet action trap for each user defined trap 7. Resend L3 packets and verify that they are received on expected host interfaces ''' def setUp(self): super(neighborTrapTest, self).setUp() def runTest(self): print("\niPv4NeighborTrapTest()") udts = [] hostif_table_entries = [] neighs = [] nhops = [] routes = [] # Setup L3 forwarding to send packet within dest ip prefix # 172.16.1.0/24 to self.port10_rif neigh1_ip = '10.10.1.2' neigh1_mac = '00:00:00:00:01:02' route1_prefix = '172.16.1.0/24' test1_route = '172.16.1.1' nhop1 = sai_thrift_create_next_hop(self.client, ip=sai_ipaddress(neigh1_ip), router_interface_id=self.port10_rif, type=SAI_NEXT_HOP_TYPE_IP) nhops.append(nhop1) neigh1 = {"rif_id": self.port10_rif, "ip_address": sai_ipaddress(neigh1_ip)} neighbor1_entry = sai_thrift_neighbor_entry_t(**neigh1) sai_thrift_create_neighbor_entry(self.client, neighbor1_entry, dst_mac_address=neigh1_mac) neighs.append(neighbor1_entry) route1_entry = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(route1_prefix)) sai_thrift_create_route_entry(self.client, route1_entry, next_hop_id=nhop1) routes.append(route1_entry) # Setup L3 forwarding to send packet within dest ip prefix # 172.16.1.0/24 to self.port11_rif neigh2_ip = '10.10.2.2' neigh2_mac = '00:00:00:00:02:02' route2_prefix = '172.16.2.0/24' test2_route = '172.16.2.1' nhop2 = sai_thrift_create_next_hop(self.client, ip=sai_ipaddress(neigh2_ip), router_interface_id=self.port11_rif, type=SAI_NEXT_HOP_TYPE_IP) nhops.append(nhop2) neigh2 = {"rif_id": self.port11_rif, "ip_address": sai_ipaddress(neigh2_ip)} neighbor2_entry = sai_thrift_neighbor_entry_t(**neigh2) sai_thrift_create_neighbor_entry(self.client, neighbor2_entry, dst_mac_address=neigh2_mac) neighs.append(neighbor2_entry) route2_entry = sai_thrift_route_entry_t( vr_id=self.default_vrf, destination=sai_ipprefix(route2_prefix)) sai_thrift_create_route_entry(self.client, route2_entry, next_hop_id=nhop2) routes.append(route2_entry) # create packet to be trapped by user defined trap 1 test_pkt1 = simple_ip_packet(eth_dst=ROUTER_MAC, ip_dst=test1_route, ip_ttl=64) exp_test_pkt1 = simple_ip_packet(eth_src=ROUTER_MAC, eth_dst=neigh1_mac, ip_dst=test1_route, ip_ttl=63) # create packet to be trapped by user defined trap 2 test_pkt2 = simple_ip_packet(eth_dst=ROUTER_MAC, ip_dst=test2_route, ip_ttl=64) exp_test_pkt2 = simple_ip_packet(eth_src=ROUTER_MAC, eth_dst=neigh2_mac, ip_dst=test2_route, ip_ttl=63) try: # Verify Regular L3 forwarding print("Sending IP packet from port {} to port {}".format( self.dev_port12, self.dev_port10)) send_packet(self, self.dev_port12, test_pkt1) verify_packet(self, exp_test_pkt1, self.dev_port10) print("Sending IP packet from port {} to port {}".format( self.dev_port12, self.dev_port11)) send_packet(self, self.dev_port12, test_pkt2) verify_packet(self, exp_test_pkt2, self.dev_port11) # create user defined traps udt1 = sai_thrift_create_hostif_user_defined_trap( self.client, type=SAI_HOSTIF_USER_DEFINED_TRAP_TYPE_NEIGHBOR) self.assertTrue(udt1 != 0) udts.append(udt1) udt2 = sai_thrift_create_hostif_user_defined_trap( self.client, type=SAI_HOSTIF_USER_DEFINED_TRAP_TYPE_NEIGHBOR) self.assertTrue(udt2 != 0) udts.append(udt2) # associate user defined trap 1 with hostif 1 channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_PHYSICAL_PORT hostif_table_entry_udt1_hif1 = \ sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, host_if=self.hostif1, trap_id=udt1, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_TRAP_ID) self.assertTrue(hostif_table_entry_udt1_hif1 != 0) hostif_table_entries.append(hostif_table_entry_udt1_hif1) # associate user defined trap 2 with hostif 2 channel = SAI_HOSTIF_TABLE_ENTRY_CHANNEL_TYPE_NETDEV_PHYSICAL_PORT hostif_table_entry_udt2_hif2 = \ sai_thrift_create_hostif_table_entry( self.client, channel_type=channel, host_if=self.hostif2, trap_id=udt2, type=SAI_HOSTIF_TABLE_ENTRY_TYPE_TRAP_ID) self.assertTrue(hostif_table_entry_udt2_hif2 != 0) hostif_table_entries.append(hostif_table_entry_udt2_hif2) # Remove Forward Neighbor entries while nhops: sai_thrift_remove_next_hop(self.client, nhops.pop()) while neighs: sai_thrift_remove_neighbor_entry(self.client, neighs.pop()) # Create Neighbor Trap Entries nhop1 = sai_thrift_create_next_hop( self.client, ip=sai_ipaddress(neigh1_ip), router_interface_id=self.port10_rif, type=SAI_NEXT_HOP_TYPE_IP) nhops.append(nhop1) sai_thrift_create_neighbor_entry( self.client, neighbor1_entry, packet_action=SAI_PACKET_ACTION_TRAP, user_trap_id=udt1) neighs.append(neighbor1_entry) nhop2 = sai_thrift_create_next_hop( self.client, ip=sai_ipaddress(neigh2_ip), router_interface_id=self.port11_rif, type=SAI_NEXT_HOP_TYPE_IP) nhops.append(nhop2) sai_thrift_create_neighbor_entry( self.client, neighbor2_entry, packet_action=SAI_PACKET_ACTION_TRAP, user_trap_id=udt2) neighs.append(neighbor2_entry) sai_thrift_set_route_entry_attribute(self.client, route1_entry, next_hop_id=nhop1) sai_thrift_set_route_entry_attribute(self.client, route2_entry, next_hop_id=nhop2) pre_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) cpu_queue_pkt_count = 0 print("Sending IP packet from port {} to {}".format( self.dev_port12, self.hostif1_name)) send_packet(self, self.dev_port12, test_pkt1) self.assertTrue( socket_verify_packet( test_pkt1, self.hostif1_socket), "Expected packet not received on {}" .format( self.hostif1_name)) cpu_queue_pkt_count += 1 print("Sending IP packet from port {} to {}".format( self.dev_port12, self.hostif2_name)) send_packet(self, self.dev_port12, test_pkt2) self.assertTrue( socket_verify_packet( test_pkt2, self.hostif2_socket)) cpu_queue_pkt_count += 1 print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"] + cpu_queue_pkt_count) print("\tOK") # Update Neighbor Trap entries with Null Object ID # SAI spec expects for neighbor packet action trap and trap id=0 # packets are not trapped to the CPU print("Updating Neighbor Entry Trap IDs to Null Object ID") sai_thrift_set_neighbor_entry_attribute(self.client, neighbor1_entry, user_trap_id=0) sai_thrift_set_neighbor_entry_attribute(self.client, neighbor2_entry, user_trap_id=0) pre_stats = post_stats print("Sending IP packet from port {}.Packet must be dropped" .format(self.dev_port11)) send_packet(self, self.dev_port12, test_pkt1) print("Sending IP packet from port {}.Packet must be dropped" .format(self.dev_port12)) send_packet(self, self.dev_port12, test_pkt2) verify_no_other_packets(self) print("Verifying CPU port queue stats") time.sleep(4) post_stats = query_counter( self, sai_thrift_get_queue_stats, self.cpu_queue0) self.assertEqual( post_stats["SAI_QUEUE_STAT_PACKETS"], pre_stats["SAI_QUEUE_STAT_PACKETS"]) finally: while routes: sai_thrift_remove_route_entry(self.client, routes.pop()) while nhops: sai_thrift_remove_next_hop(self.client, nhops.pop()) while neighs: sai_thrift_remove_neighbor_entry(self.client, neighs.pop()) while hostif_table_entries: sai_thrift_remove_hostif_table_entry( self.client, hostif_table_entries.pop()) while udts: sai_thrift_remove_hostif_user_defined_trap( self.client, udts.pop()) def tearDown(self): super(neighborTrapTest, self).tearDown()