def macLearnErrorTest()

in ptf/saifdb.py [0:0]


    def macLearnErrorTest(self):
        '''
        Verify if MAC addresses are not learned when different undesirable
        conditions occured:
        1 - invalid VLAN tag
        2 - src_mac is a broadcast address (packet drop)
        3 - src_mac is a multicast address (packet drop)
        4 - src_mac previously added statically
        5 - removed VLAN member
        6 - removed LAG member
        '''
        print("\nmacLearnErrorTest()")

        access_port = self.dev_port0  # untagged
        ap_mac = self.macs[0]
        trunk_port = self.dev_port1  # tagged
        tp_mac = self.macs[1]

        flood_port_list = [[trunk_port], self.utg_lag_ports, self.tg_lag_ports]

        try:
            fdb_entry1 = sai_thrift_fdb_entry_t(switch_id=self.switch_id,
                                                mac_address=ap_mac,
                                                bv_id=self.vlan10)
            status = sai_thrift_create_fdb_entry(
                self.client,
                fdb_entry1,
                type=SAI_FDB_ENTRY_TYPE_STATIC,
                bridge_port_id=self.port0_bp)
            self.assertEqual(status, SAI_STATUS_SUCCESS)

            fdb_entry2 = sai_thrift_fdb_entry_t(switch_id=self.switch_id,
                                                mac_address=tp_mac,
                                                bv_id=self.vlan10)
            status = sai_thrift_create_fdb_entry(
                self.client,
                fdb_entry2,
                type=SAI_FDB_ENTRY_TYPE_STATIC,
                bridge_port_id=self.port1_bp)
            self.assertEqual(status, SAI_STATUS_SUCCESS)

            print("Case 1 - invalid VLAN tag")
            inv_vlan_id = 100
            lrn_mac = self.macs[2]

            inv_vlan_tag_pkt = simple_udp_packet(eth_dst=ap_mac,
                                                 eth_src=lrn_mac,
                                                 dl_vlan_enable=True,
                                                 vlan_vid=inv_vlan_id,
                                                 pktlen=104)
            chck_inv_vlan_pkt = simple_udp_packet(eth_dst=lrn_mac,
                                                  eth_src=ap_mac,
                                                  pktlen=100)
            chck_inv_vlan_tag_pkt = simple_udp_packet(eth_dst=lrn_mac,
                                                      eth_src=ap_mac,
                                                      dl_vlan_enable=True,
                                                      vlan_vid=self.vlan_id,
                                                      pktlen=104)

            flood_pkt_list = [chck_inv_vlan_tag_pkt, chck_inv_vlan_pkt,
                              chck_inv_vlan_tag_pkt]

            print("Sending packet with invalid VLAN tag on port %d, %s -> %s" %
                  (trunk_port, lrn_mac, ap_mac))
            send_packet(self, trunk_port, inv_vlan_tag_pkt)
            verify_no_other_packets(self)
            print("\tPacket dropped")

            print("Checking if MAC %s was not learned on port %d" %
                  (lrn_mac, trunk_port))
            print("Sending packet on port %d, %s -> %s - will flood" %
                  (access_port, ap_mac, lrn_mac))
            send_packet(self, access_port, chck_inv_vlan_pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)
            print("\tVerification complete\n")

            print("Case 2 - broadcast src_mac address")
            bcast_mac = "ff:ff:ff:ff:ff:ff"

            bcast_src_tag_pkt = simple_udp_packet(eth_dst=ap_mac,
                                                  eth_src=bcast_mac,
                                                  dl_vlan_enable=True,
                                                  vlan_vid=self.vlan_id,
                                                  pktlen=104)
            chck_bcast_src_pkt = simple_udp_packet(eth_dst=bcast_mac,
                                                   eth_src=ap_mac,
                                                   pktlen=100)
            chck_bcast_src_tag_pkt = simple_udp_packet(eth_dst=bcast_mac,
                                                       eth_src=ap_mac,
                                                       dl_vlan_enable=True,
                                                       vlan_vid=self.vlan_id,
                                                       pktlen=104)

            flood_pkt_list = [chck_bcast_src_tag_pkt, chck_bcast_src_pkt,
                              chck_bcast_src_tag_pkt]

            print("Sending packet with invalid src_mac on port %d, %s -> %s" %
                  (trunk_port, bcast_mac, ap_mac))
            send_packet(self, trunk_port, bcast_src_tag_pkt)
            verify_no_other_packets(self)
            print("\tPacket dropped")

            print("Checking if broadcast MAC %s was not learned on port %d" %
                  (bcast_mac, trunk_port))
            print("Sending packet on port %d, %s -> %s - will flood" %
                  (access_port, ap_mac, bcast_mac))
            send_packet(self, access_port, chck_bcast_src_pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)
            print("\tVerification complete\n")

            print("Case 3 - multicast src_mac address")
            mcast_mac = "01:00:5e:11:22:33"

            mcast_src_tag_pkt = simple_udp_packet(eth_dst=ap_mac,
                                                  eth_src=mcast_mac,
                                                  dl_vlan_enable=True,
                                                  vlan_vid=self.vlan_id,
                                                  pktlen=104)
            chck_mcast_src_pkt = simple_udp_packet(eth_dst=mcast_mac,
                                                   eth_src=ap_mac,
                                                   pktlen=100)
            chck_mcast_src_tag_pkt = simple_udp_packet(eth_dst=mcast_mac,
                                                       eth_src=ap_mac,
                                                       dl_vlan_enable=True,
                                                       vlan_vid=self.vlan_id,
                                                       pktlen=104)

            flood_pkt_list = [chck_mcast_src_tag_pkt, chck_mcast_src_pkt,
                              chck_mcast_src_tag_pkt]

            print("Sending packet with invalid src_mac on port %d, %s -> %s" %
                  (trunk_port, mcast_mac, ap_mac))
            send_packet(self, trunk_port, mcast_src_tag_pkt)
            verify_no_other_packets(self)
            print("\tPacket dropped")

            print("Checking if multicast MAC %s was not learned on port %d" %
                  (mcast_mac, trunk_port))
            print("Sending packet on port %d, %s -> %s - will flood" %
                  (access_port, ap_mac, mcast_mac))
            send_packet(self, access_port, chck_mcast_src_pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)
            print("\tVerification complete\n")

            print("Case 4 - src_mac statically added")
            lrn_mac = self.macs[2]
            bcast_mac = "ff:ff:ff:ff:ff:ff"
            static_src_tag_pkt = simple_udp_packet(eth_dst=bcast_mac,
                                                   eth_src=ap_mac,
                                                   dl_vlan_enable=True,
                                                   vlan_vid=self.vlan_id,
                                                   pktlen=104)
            static_src_pkt = simple_udp_packet(eth_dst=bcast_mac,
                                               eth_src=ap_mac,
                                               pktlen=100)
            chck_static_src_pkt = simple_udp_packet(eth_dst=ap_mac,
                                                    eth_src=lrn_mac,
                                                    pktlen=100)

            flood_pkt_list = [
                static_src_pkt, static_src_pkt, static_src_tag_pkt]
            flood_port_list = [
                [access_port], self.utg_lag_ports, self.tg_lag_ports]

            print("Sending packet on port %d, %s -> %s" %
                  (trunk_port, ap_mac, bcast_mac))
            send_packet(self, trunk_port, static_src_tag_pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)
            print("\tPacket flooded")

            print("Checking if MAC %s was not learned on port %d" %
                  (ap_mac, trunk_port))
            print("Sending packet on port %d, %s -> %s - forward to port %d" %
                  (self.utg_lag_ports[1], lrn_mac, ap_mac, access_port))
            send_packet(self, self.utg_lag_ports[1], chck_static_src_pkt)
            verify_packets(self, chck_static_src_pkt, [access_port])
            print("\tVerification complete\n")

            print("Case 5 - removed VLAN member")
            rm_vlan_member = self.vlan10_member1
            rm_vlan_member_dev = self.dev_port1
            rm_vlan_member_mac = "00:12:34:56:78:90"

            sai_thrift_remove_vlan_member(self.client, rm_vlan_member)

            # check if packet is not flooded to the removed VLAN member port
            pkt = simple_udp_packet(eth_dst="ff:ff:ff:ff:ff:ff",
                                    eth_src=self.macs[0],
                                    pktlen=100)
            tag_pkt = simple_udp_packet(eth_dst="ff:ff:ff:ff:ff:ff",
                                        eth_src=self.macs[0],
                                        dl_vlan_enable=True,
                                        vlan_vid=self.vlan_id,
                                        pktlen=104)

            flood_port_list = [self.utg_lag_ports, self.tg_lag_ports]
            flood_pkt_list = [pkt, tag_pkt]

            print("Sending packet on port %d, %s -> %s\nchecking if flood "
                  "doesn't reach port %d" % (self.dev_port0, self.macs[0],
                                             "ff:ff:ff:ff:ff:ff",
                                             rm_vlan_member_dev))
            send_packet(self, self.dev_port0, pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)
            print("\tOK")

            # check if MAC is not learned on removed VLAN member
            tag_pkt = simple_udp_packet(eth_dst="ff:ff:ff:ff:ff:ff",
                                        eth_src=rm_vlan_member_mac,
                                        dl_vlan_enable=True,
                                        vlan_vid=self.vlan_id,
                                        pktlen=104)

            print("Sending packet on removed VLAN port (%d), %s -> %s - "
                  "will be dropped" % (rm_vlan_member_dev, rm_vlan_member_mac,
                                       "ff:ff:ff:ff:ff:ff"))
            send_packet(self, rm_vlan_member_dev, tag_pkt)
            verify_no_other_packets(self)
            print("\tOK")

            print("Checking if MAC was not learned")
            pkt = simple_udp_packet(eth_dst=rm_vlan_member_mac,
                                    eth_src=self.macs[0],
                                    pktlen=100)
            tag_pkt = simple_udp_packet(eth_dst=rm_vlan_member_mac,
                                        eth_src=self.macs[0],
                                        dl_vlan_enable=True,
                                        vlan_vid=self.vlan_id,
                                        pktlen=104)

            flood_pkt_list = [pkt, tag_pkt]

            print("Sending packet on port %d, %s -> %s - will flood within "
                  "VLAN 10" % (self.dev_port0, self.macs[0],
                               rm_vlan_member_mac))
            send_packet(self, self.dev_port0, pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)

            print("Case 6 - removed LAG member")
            rm_lag_member = self.lag1_member5
            rm_lag_member_dev = self.dev_port5
            rm_lag_member_mac = "00:09:87:65:43:21"

            sai_thrift_remove_lag_member(self.client, rm_lag_member)
            self.utg_lag_ports.remove(rm_lag_member_dev)

            pkt = simple_udp_packet(eth_dst="ff:ff:ff:ff:ff:ff",
                                    eth_src=rm_lag_member_mac,
                                    pktlen=100)

            print("Sending packet on removed LAG member (port %d), %s -> %s - "
                  "will be dropped" % (rm_lag_member_dev, rm_lag_member_mac,
                                       "ff:ff:ff:ff:ff:ff"))
            send_packet(self, rm_lag_member_dev, pkt)
            verify_no_other_packets(self)
            print("\tOK")

            print("Checking if MAC was not learned")
            pkt = simple_udp_packet(eth_dst=rm_lag_member_mac,
                                    eth_src=self.macs[0],
                                    pktlen=100)
            tag_pkt = simple_udp_packet(eth_dst=rm_lag_member_mac,
                                        eth_src=self.macs[0],
                                        dl_vlan_enable=True,
                                        vlan_vid=self.vlan_id,
                                        pktlen=104)

            flood_pkt_list = [pkt, tag_pkt]

            print("Sending packet on port %d, %s -> %s - will flood within "
                  "VLAN 10" % (self.dev_port0, self.macs[0],
                               rm_lag_member_mac))
            send_packet(self, self.dev_port0, pkt)
            verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                      flood_port_list)
            print("\tVerification complete\n")

        finally:
            sai_thrift_flush_fdb_entries(
                self.client, entry_type=SAI_FDB_FLUSH_ENTRY_TYPE_ALL)

            # restore removed LAG member
            self.lag1_member5 = sai_thrift_create_lag_member(
                self.client, lag_id=self.lag1, port_id=self.port5)
            self.utg_lag_ports.insert(1, rm_lag_member_dev)

            # restore removed vlan member
            self.vlan10_member1 = sai_thrift_create_vlan_member(
                self.client, vlan_id=self.vlan10, bridge_port_id=self.port1_bp,
                vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_TAGGED)