def l2QosMapSamePcpToTcManyIngressPortsTest()

in ptf/saiqosmap.py [0:0]


    def l2QosMapSamePcpToTcManyIngressPortsTest(self):
        ''' Test verifies the following:
            1. L2 traffic same PCP to many TC mapping
            The verification method:
                - setting the TC to QUEUE mapping
                - verification of the queue stats
        '''
        print("l2QosMapSamePcpToTcManyIngressPortsTest")
        test_cases = []
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 0,
            'p1_queue': 0,
            'p2_queue': 0,
            'p3_queue': 0
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 1,
            'p1_queue': 1,
            'p2_queue': 1,
            'p3_queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 2,
            'p1_queue': 2,
            'p2_queue': 2,
            'p3_queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 3,
            'p1_queue': 3,
            'p2_queue': 3,
            'p3_queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 4,
            'p1_queue': 4,
            'p2_queue': 4,
            'p3_queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 5,
            'p1_queue': 5,
            'p2_queue': 5,
            'p3_queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 6,
            'p1_queue': 6,
            'p2_queue': 6,
            'p3_queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'pcp': 7,
            'p1_queue': 7,
            'p2_queue': 7,
            'p3_queue': 7
        })
        try:
            print("Create dot1p -> tc qos mapping")
            ingress_dot1p_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            qos_dot1p_to_tc_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_DOT1P_TO_TC, ingress_dot1p_list,
                ingress_tc_list)
            self.assertTrue(qos_dot1p_to_tc_map_id != 0,
                            "Failed to create qos_map")
            print("\tok")
            print("Create tc -> queue index qos mapping")
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_queue_list = [0, 1, 2, 3, 4, 5, 6, 7]
            qos_tc_to_queue_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
                ingress_queue_list)
            self.assertTrue(qos_tc_to_queue_map_id != 0,
                            "Failed to create qos_map")
            print("\tok")
            # configure the ingress ports
            for port, dot1p_to_tc in [[self.port0, qos_dot1p_to_tc_map_id],
                                      [self.port1, qos_dot1p_to_tc_map_id],
                                      [self.port2, qos_dot1p_to_tc_map_id],
                                      [self.port3, qos_dot1p_to_tc_map_id]]:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_dot1p_to_tc_map=dot1p_to_tc)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_dot1p_to_tc_map=True)
                self.assertEqual(attr['qos_dot1p_to_tc_map'], dot1p_to_tc)
            # configure the egress ports
            for port, tc_to_queue in [[self.port0, qos_tc_to_queue_map_id],
                                      [self.port1, qos_tc_to_queue_map_id],
                                      [self.port2, qos_tc_to_queue_map_id],
                                      [self.port3, qos_tc_to_queue_map_id]]:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_tc_to_queue_map=tc_to_queue)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_tc_to_queue_map=True)
                self.assertEqual(attr['qos_tc_to_queue_map'], tc_to_queue)
            # verify port default_tc
            ports = [self.port0, self.port1, self.port2, self.port3]
            self.verifyPortDefaultDscpToTc(ports)
            for test_port in self.dev_test_port_list:
                print("Testing tx_port=%d" % (test_port))
                for test in test_cases:
                    pcp = test['pcp']
                    pkt = simple_eth_raw_packet_with_taglist(
                        pktlen=104,
                        eth_dst='00:33:33:33:33:00',
                        eth_src='00:33:33:33:33:11',
                        dl_taglist_enable=True,
                        dl_vlan_pcp_list=[pcp],
                        dl_vlanid_list=[1])
                    flood_port_list = []
                    flood_pkt_list = []
                    for port in self.dev_test_port_list:
                        if port == test_port:
                            continue
                        flood_port_list.append([port])
                        flood_pkt_list.append(pkt)
                    init_q_cnt = []
                    for port in [
                            self.port0, self.port1, self.port2, self.port3
                    ]:
                        init_q_cnt.append(
                            self.getPortQueueIndexStats(
                                port, test['p1_queue'],
                                ['SAI_QUEUE_STAT_PACKETS']))
                    print("Sending packet port %d (pcp=%d) -> "
                          "port %d (queue=%d)"
                          % (test_port, pcp, flood_port_list[0][0],
                             test['p1_queue']))
                    print("\t\t\t\t\tport %d (queue=%d)"
                          % (flood_port_list[1][0], test['p2_queue']))
                    print("\t\t\t\t\tport %d (queue=%d)"
                          % (flood_port_list[2][0], test['p3_queue']))
                    send_packet(self, test_port, pkt)
                    verify_each_packet_on_multiple_port_lists(
                        self, flood_pkt_list, flood_port_list)
                    print("\tPacket flooded to ports %s. ok" %
                          (flood_port_list))
                    print("\tVerify port queues packet counters.")
                    post_q_cnt = []
                    for port in [
                            self.port0, self.port1, self.port2, self.port3
                    ]:
                        post_q_cnt.append(
                            self.getPortQueueIndexStats(
                                port, test['p1_queue'],
                                ['SAI_QUEUE_STAT_PACKETS']))
                    # verify the port queue counters
                    print("Test")
                    print(type(test))
                    self.verifyPortQueueCountersVariousPcp(
                        test_port, test, init_q_cnt, post_q_cnt)
                    print("\tok")
        finally:
            for port in [self.port0, self.port1, self.port2, self.port3]:
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_tc_to_queue_map=0)
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_default_tc=0)
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_dot1p_to_tc_map=0)
            sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
            sai_thrift_remove_qos_map(self.client, qos_dot1p_to_tc_map_id)