def l2QosMapMultiplePCPToOneTcMappingTest()

in ptf/saiqosmap.py [0:0]


    def l2QosMapMultiplePCPToOneTcMappingTest(self):
        ''' Test verifies the following:
            1. L2 traffic multiple PCP to TC mapping
            The verification method:
                - setting the TC to QUEUE mapping
                - verification of the queue stats
        '''
        print("l2QosMapMultiplePCPToOneTcMappingTest")
        test_cases = []
        # port default_tc Test Case
        test_cases.append({
            'pcp': 0,
            'port_default_tc': 0,
            'p1_queue': 3,
            'p2_queue': 3,
            'p3_queue': 3
        })
        test_cases.append({
            'pcp': 1,
            'port_default_tc': 0,
            'p1_queue': 3,
            'p2_queue': 3,
            'p3_queue': 3
        })
        test_cases.append({
            'pcp': 2,
            'port_default_tc': 0,
            'p1_queue': 3,
            'p2_queue': 3,
            'p3_queue': 3
        })
        test_cases.append({
            'pcp': 3,
            'port_default_tc': 0,
            'p1_queue': 3,
            'p2_queue': 3,
            'p3_queue': 3
        })
        test_cases.append({
            'pcp': 4,
            'port_default_tc': 0,
            'p1_queue': 7,
            'p2_queue': 7,
            'p3_queue': 7
        })
        test_cases.append({
            'pcp': 5,
            'port_default_tc': 0,
            'p1_queue': 7,
            'p2_queue': 7,
            'p3_queue': 7
        })
        test_cases.append({
            'pcp': 6,
            'port_default_tc': 0,
            'p1_queue': 7,
            'p2_queue': 7,
            'p3_queue': 7
        })
        test_cases.append({
            'pcp': 7,
            'port_default_tc': 0,
            'p1_queue': 7,
            'p2_queue': 7,
            'p3_queue': 7
        })
        try:
            print("Create dot1p -> tc qos mapping")
            ingress_dot1p_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_tc_list = [3, 3, 3, 3, 7, 7, 7, 7]
            qos_dot1p_to_tc_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_DOT1P_TO_TC, ingress_dot1p_list,
                ingress_tc_list)
            self.assertTrue(qos_dot1p_to_tc_map_id != 0,
                            "Failed to create qos_map")
            print("\tok")
            print("Create tc -> queue index qos mapping")
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_queue_list = [3, 3, 3, 3, 7, 7, 7, 7]
            qos_tc_to_queue_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
                ingress_queue_list)
            self.assertTrue(qos_tc_to_queue_map_id != 0,
                            "Failed to create qos_map")
            print("\tok")
            # configure the ingress ports
            for port, dot1p_to_tc in [[self.port0, qos_dot1p_to_tc_map_id]]:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_dot1p_to_tc_map=dot1p_to_tc)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_dot1p_to_tc_map=True)
                self.assertEqual(attr['qos_dot1p_to_tc_map'], dot1p_to_tc)
            # configure the egress ports
            for port, tc_to_queue in [[self.port1, qos_tc_to_queue_map_id],
                                      [self.port2, qos_tc_to_queue_map_id],
                                      [self.port3, qos_tc_to_queue_map_id]]:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_tc_to_queue_map=tc_to_queue)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_tc_to_queue_map=True)
                self.assertEqual(attr['qos_tc_to_queue_map'], tc_to_queue)
            # verify port default tc
            for port in [self.port0, self.port1, self.port2, self.port3]:
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_dscp_to_tc_map=True,
                                                     qos_default_tc=True)
                self.assertEqual(attr['qos_dscp_to_tc_map'], 0)
                self.assertEqual(attr['qos_default_tc'], 0)
            for test in test_cases:
                pcp = test['pcp']
                # setup port default_tc
                port_default_tc = 0
                if test['port_default_tc'] is not None:
                    port_default_tc = test['port_default_tc']
                for port in [self.port0]:
                    sai_thrift_set_port_attribute(
                        self.client, port, qos_default_tc=port_default_tc)
                    attr = sai_thrift_get_port_attribute(self.client,
                                                         port,
                                                         qos_default_tc=True)
                    self.assertEqual(attr['qos_default_tc'], port_default_tc)
                p1_initial_q_cnt = self.getPortQueueIndexStats(
                    self.port1, test['p1_queue'], ['SAI_QUEUE_STAT_PACKETS'])
                p2_initial_q_cnt = self.getPortQueueIndexStats(
                    self.port2, test['p2_queue'], ['SAI_QUEUE_STAT_PACKETS'])
                p3_initial_q_cnt = self.getPortQueueIndexStats(
                    self.port3, test['p3_queue'], ['SAI_QUEUE_STAT_PACKETS'])
                print("Sending packet port %d (pcp=%d) -> "
                      "port %d (port_default_tc=%d queue=%d)"
                      % (self.dev_port0, pcp, self.dev_port1,
                         test['port_default_tc'], test['p1_queue']))
                pkt = simple_eth_raw_packet_with_taglist(
                    pktlen=104,
                    eth_dst='00:33:33:33:33:00',
                    eth_src='00:33:33:33:33:11',
                    dl_taglist_enable=True,
                    dl_vlan_pcp_list=[pcp],
                    dl_vlanid_list=[1])
                test_port = self.dev_port0
                flood_port_list = [[self.dev_port1], [self.dev_port2],
                                   [self.dev_port3]]
                flood_pkt_list = [pkt, pkt, pkt]
                send_packet(self, test_port, pkt)
                verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                          flood_port_list)
                print("\tPacket flooded to ports 1,2,3. ok")
                p1_post_q_cnt = self.getPortQueueIndexStats(
                    self.port1, test['p1_queue'], ['SAI_QUEUE_STAT_PACKETS'])
                p2_post_q_cnt = self.getPortQueueIndexStats(
                    self.port2, test['p2_queue'], ['SAI_QUEUE_STAT_PACKETS'])
                p3_post_q_cnt = self.getPortQueueIndexStats(
                    self.port3, test['p3_queue'], ['SAI_QUEUE_STAT_PACKETS'])
                self.assertEqual(
                    p1_initial_q_cnt + 1, p1_post_q_cnt,
                    'PORT1 queue {} packets counter {} != {}'.format(
                        test['p1_queue'], p1_initial_q_cnt + 1, p1_post_q_cnt))
                self.assertEqual(
                    p2_initial_q_cnt + 1, p2_post_q_cnt,
                    'PORT2 queue {} packets counter {} != {}'.format(
                        test['p2_queue'], p2_initial_q_cnt + 1, p2_post_q_cnt))
                self.assertEqual(
                    p3_initial_q_cnt + 1, p3_post_q_cnt,
                    'PORT3 queue {} packets counter {} != {}'.format(
                        test['p3_queue'], p3_initial_q_cnt + 1, p3_post_q_cnt))
        finally:
            for port in [self.port0, self.port1, self.port2, self.port3]:
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_tc_to_queue_map=0)
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_default_tc=0)
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_dot1p_to_tc_map=0)
            sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
            sai_thrift_remove_qos_map(self.client, qos_dot1p_to_tc_map_id)