def l2QosMapPCPToColorDefaultMappingTest()

in ptf/saiqosmap.py [0:0]


    def l2QosMapPCPToColorDefaultMappingTest(self):
        ''' L2 traffic test verifies the following:
            1. default PCP to color mapping
            The verification method:
            No SAI_QOS_MAP_TYPE_DOT1P_TO_COLOR mapping defined
            Mapping the tc and color to pcp on egress port.
            All colors map to pcp=0, only default color GREEN
            maps to pcp 7,
        '''
        print("l2QosMapPCPToColorDefaultMappingTest")
        # default PCP to TC mapping
        test_cases = []
        # priority vlan tagged packed
        test_cases.append({
            'pcp': None,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 1,
            'queue': 1,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 2,
            'queue': 2,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 3,
            'queue': 3,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 4,
            'queue': 4,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 5,
            'queue': 5,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 6,
            'queue': 6,
            'color': 0
        })
        test_cases.append({
            'pcp': None,
            'port_default_tc': 7,
            'queue': 7,
            'color': 0
        })
        test_cases.append({
            'pcp': 1,
            'port_default_tc': 5,
            'queue': 5,
            'color': 0
        })
        test_cases.append({
            'pcp': 2,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': 3,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': 4,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': 5,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': 6,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': 7,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        test_cases.append({
            'pcp': 0,
            'port_default_tc': 1,
            'queue': 1,
            'color': 0
        })
        test_cases.append({
            'pcp': 1,
            'port_default_tc': 2,
            'queue': 2,
            'color': 0
        })
        test_cases.append({
            'pcp': 2,
            'port_default_tc': 3,
            'queue': 3,
            'color': 0
        })
        test_cases.append({
            'pcp': 3,
            'port_default_tc': 4,
            'queue': 4,
            'color': 0
        })
        test_cases.append({
            'pcp': 4,
            'port_default_tc': 5,
            'queue': 5,
            'color': 0
        })
        test_cases.append({
            'pcp': 5,
            'port_default_tc': 6,
            'queue': 6,
            'color': 0
        })
        test_cases.append({
            'pcp': 6,
            'port_default_tc': 7,
            'queue': 7,
            'color': 0
        })
        test_cases.append({
            'pcp': 7,
            'port_default_tc': 0,
            'queue': 0,
            'color': 0
        })
        try:
            # setup tc to queue index mapping
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_queue_list = [0, 1, 2, 3, 4, 5, 6, 7]
            qos_tc_to_queue_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
                ingress_queue_list)
            self.assertTrue(qos_tc_to_queue_map_id != 0,
                            "Failed to create qos_map")
            # assign to port the tc_queue and tc_color_to_pcp qos map
            for port, tc_to_queue, tc_color in [
                    [self.port0, qos_tc_to_queue_map_id,
                     self.p1_qos_tc_color_to_dot1q_id],
                    [self.port1, qos_tc_to_queue_map_id,
                     self.p1_qos_tc_color_to_dot1q_id],
                    [self.port2, qos_tc_to_queue_map_id,
                     self.p2_qos_tc_color_to_dot1q_id],
                    [self.port3, qos_tc_to_queue_map_id,
                     self.p3_qos_tc_color_to_dot1q_id]]:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_tc_to_queue_map=tc_to_queue)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_tc_to_queue_map=True)
                self.assertEqual(attr['qos_tc_to_queue_map'], tc_to_queue)
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_tc_and_color_to_dot1p_map=tc_color)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(
                    self.client, port, qos_tc_and_color_to_dot1p_map=True)
                self.assertEqual(attr['qos_tc_and_color_to_dot1p_map'],
                                 tc_color)
            # verify port default tc
            ports = [self.port0, self.port1, self.port2, self.port3]
            self.verifyPortDefaultDot1pToTc(ports)
            for test in test_cases:
                queue = test['queue']
                # setup port default_tc
                if test['port_default_tc'] is not None:
                    # every test port gets the same default_tc =
                    # test['port_default_tc']
                    port_default_tc = test['port_default_tc']
                    port_config = [[self.port0, port_default_tc],
                                   [self.port1, port_default_tc],
                                   [self.port2, port_default_tc],
                                   [self.port3, port_default_tc]]
                else:
                    # every port gets default_tc as its port number
                    # port 0 -> default tc = 5
                    # port 1 -> default tc = 1
                    # port 2 -> default tc = 2
                    # port 3 -> default tc = 3
                    port_config = [[self.port0, 5], [self.port1, 1],
                                   [self.port2, 2], [self.port3, 3]]
                for port, port_default_tc in port_config:
                    sai_thrift_set_port_attribute(
                        self.client, port, qos_default_tc=port_default_tc)
                    attr = sai_thrift_get_port_attribute(self.client,
                                                         port,
                                                         qos_default_tc=True)
                    self.assertEqual(attr['qos_default_tc'], port_default_tc)
                p1_initial_q_cnt = self.getPortQueueIndexStats(
                    self.port1, queue, ['SAI_QUEUE_STAT_PACKETS'])
                p2_initial_q_cnt = self.getPortQueueIndexStats(
                    self.port2, queue, ['SAI_QUEUE_STAT_PACKETS'])
                p3_initial_q_cnt = self.getPortQueueIndexStats(
                    self.port3, queue, ['SAI_QUEUE_STAT_PACKETS'])
                if test['port_default_tc'] is not None:
                    port_default_tc = str(test['port_default_tc'])
                else:
                    port_default_tc = str(1)
                if test['pcp'] is not None:
                    vlan = 1
                    ingress_pcp = str(test['pcp'])
                else:
                    vlan = 0
                    ingress_pcp = 'None'
                # define expected egress_pcp per egress port
                p1_egress_pcp = 1
                p2_egress_pcp = 3
                p3_egress_pcp = 5
                print("Sending packet port %d (port_default_tc=%s"
                      ", pcp=%s) -> port %d (queue=%d, egress_pcp=%d)"
                      % (self.dev_port0, port_default_tc,
                         ingress_pcp, self.dev_port1, queue, p1_egress_pcp))
                print("%54s port %d (queue=%d, egress_pcp=%d))" %
                      (" ", self.dev_port2, queue, p2_egress_pcp))
                print("%54s port %d (queue=%d, egress_pcp=%d))" %
                      (" ", self.dev_port3, queue, p3_egress_pcp))
                pkt = simple_tcp_packet(eth_dst='00:33:33:33:33:00',
                                        eth_src='00:33:33:33:33:11',
                                        dl_vlan_enable=True,
                                        vlan_vid=vlan,
                                        vlan_pcp=test['pcp'],
                                        pktlen=104)
                p1_exp_pkt = simple_tcp_packet(
                    eth_dst='00:33:33:33:33:00',
                    eth_src='00:33:33:33:33:11',
                    dl_vlan_enable=True,
                    vlan_vid=1,
                    vlan_pcp=p1_egress_pcp,
                    pktlen=104)
                p2_exp_pkt = simple_tcp_packet(
                    eth_dst='00:33:33:33:33:00',
                    eth_src='00:33:33:33:33:11',
                    dl_vlan_enable=True,
                    vlan_vid=1,
                    vlan_pcp=p2_egress_pcp,
                    pktlen=104)
                p3_exp_pkt = simple_tcp_packet(
                    eth_dst='00:33:33:33:33:00',
                    eth_src='00:33:33:33:33:11',
                    dl_vlan_enable=True,
                    vlan_vid=1,
                    vlan_pcp=p3_egress_pcp,
                    pktlen=104)
                test_port = self.dev_port0
                flood_port_list = [[self.dev_port1], [self.dev_port2],
                                   [self.dev_port3]]
                flood_pkt_list = [p1_exp_pkt, p2_exp_pkt, p3_exp_pkt]
                send_packet(self, test_port, pkt)
                verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
                                                          flood_port_list)
                print("\tPacket flooded. ok")
                p1_post_q_cnt = self.getPortQueueIndexStats(
                    self.port1, queue, ['SAI_QUEUE_STAT_PACKETS'])
                p2_post_q_cnt = self.getPortQueueIndexStats(
                    self.port2, queue, ['SAI_QUEUE_STAT_PACKETS'])
                p3_post_q_cnt = self.getPortQueueIndexStats(
                    self.port3, queue, ['SAI_QUEUE_STAT_PACKETS'])
                self.assertEqual(
                    p1_initial_q_cnt + 1, p1_post_q_cnt,
                    'PORT1 queue {} packets counter {} != {}'.format(
                        queue, p1_initial_q_cnt + 1, p1_post_q_cnt))
                self.assertEqual(
                    p2_initial_q_cnt + 1, p2_post_q_cnt,
                    'PORT2 queue {} packets counter {} != {}'.format(
                        queue, p2_initial_q_cnt + 1, p2_post_q_cnt))
                self.assertEqual(
                    p3_initial_q_cnt + 1, p3_post_q_cnt,
                    'PORT3 queue {} packets counter {} != {}'.format(
                        queue, p3_initial_q_cnt + 1, p3_post_q_cnt))
        finally:
            for port in [self.port0, self.port1, self.port2, self.port3]:
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_tc_to_queue_map=0)
                sai_thrift_set_port_attribute(self.client,
                                              port,
                                              qos_tc_and_color_to_dot1p_map=0)
            sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)