def l3IPv6QosVariousDscpToTcMappingManyPortsTest()

in ptf/saiqosmap.py [0:0]


    def l3IPv6QosVariousDscpToTcMappingManyPortsTest(self):
        '''
        L3 IPv6 various DSCP to TC mapping for various ingress port test.
        '''
        print("l3IPv6QosVariousDscpToTcMappingManyPortsTest")
        test_cases = []
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 0,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 1,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 2,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 3,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 4,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 5,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 6,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 7,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 10,
            'tc': 0,
            'queue': 0
        })  # default
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 0,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 1,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 2,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 3,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 4,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 5,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 6,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 7,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 10,
            'tc': 0,
            'queue': 0
        })  # default
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 0,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 1,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 2,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 3,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 4,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 5,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 6,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 7,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 10,
            'tc': 0,
            'queue': 0
        })  # default
        try:
            # setup dscp to tc mapping
            ingress_dscp_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_tc_list = [1, 1, 1, 1, 2, 2, 2, 2]
            p0_qos_dscp_to_tc_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_DSCP_TO_TC, ingress_dscp_list,
                ingress_tc_list)
            self.assertTrue(p0_qos_dscp_to_tc_map_id != 0,
                            "Failed to create qos_map")
            ingress_dscp_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_tc_list = [3, 3, 3, 3, 4, 4, 4, 4]
            p1_qos_dscp_to_tc_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_DSCP_TO_TC, ingress_dscp_list,
                ingress_tc_list)
            self.assertTrue(p1_qos_dscp_to_tc_map_id != 0,
                            "Failed to create qos_map")
            ingress_dscp_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_tc_list = [5, 5, 5, 5, 6, 6, 6, 6]
            p3_qos_dscp_to_tc_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_DSCP_TO_TC, ingress_dscp_list,
                ingress_tc_list)
            self.assertTrue(p3_qos_dscp_to_tc_map_id != 0,
                            "Failed to create qos_map")
            # setup tc to queue index mapping
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_queue_list = [0, 1, 2, 3, 4, 5, 6, 7]
            qos_tc_to_queue_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
                ingress_queue_list)
            self.assertTrue(qos_tc_to_queue_map_id != 0,
                            "Failed to create qos_map")
            # set/get
            egress_test_ports = [self.port2]
            # setup port 0
            status = sai_thrift_set_port_attribute(
                self.client,
                self.port0,
                qos_dscp_to_tc_map=p0_qos_dscp_to_tc_map_id)
            self.assertEqual(status, SAI_STATUS_SUCCESS,
                             "Failed to set port attribute")
            attr = sai_thrift_get_port_attribute(self.client,
                                                 self.port0,
                                                 qos_dscp_to_tc_map=True)
            self.assertEqual(attr['qos_dscp_to_tc_map'],
                             p0_qos_dscp_to_tc_map_id)
            # setup port 1
            status = sai_thrift_set_port_attribute(
                self.client,
                self.port1,
                qos_dscp_to_tc_map=p1_qos_dscp_to_tc_map_id)
            self.assertEqual(status, SAI_STATUS_SUCCESS,
                             "Failed to set port attribute")
            attr = sai_thrift_get_port_attribute(self.client,
                                                 self.port1,
                                                 qos_dscp_to_tc_map=True)
            self.assertEqual(attr['qos_dscp_to_tc_map'],
                             p1_qos_dscp_to_tc_map_id)
            # setup port 3
            status = sai_thrift_set_port_attribute(
                self.client,
                self.port3,
                qos_dscp_to_tc_map=p3_qos_dscp_to_tc_map_id)
            self.assertEqual(status, SAI_STATUS_SUCCESS,
                             "Failed to set port attribute")
            attr = sai_thrift_get_port_attribute(self.client,
                                                 self.port3,
                                                 qos_dscp_to_tc_map=True)
            self.assertEqual(attr['qos_dscp_to_tc_map'],
                             p3_qos_dscp_to_tc_map_id)
            for port in egress_test_ports:
                status = sai_thrift_set_port_attribute(
                    self.client,
                    port,
                    qos_tc_to_queue_map=qos_tc_to_queue_map_id)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_tc_to_queue_map=True)
                self.assertEqual(attr['qos_tc_to_queue_map'],
                                 qos_tc_to_queue_map_id)
            for test in test_cases:
                ipv6_tc = test['dscp'] << 2
                pkt = simple_tcpv6_packet(
                    eth_dst=ROUTER_MAC,
                    eth_src='00:22:22:22:22:22',
                    ipv6_dst='1234:5678:9abc:def0:4422:1133:5577:1111',
                    ipv6_src='2000::1',
                    ipv6_hlim=64,
                    ipv6_tc=ipv6_tc,
                    pktlen=100)
                exp_pkt = simple_tcpv6_packet(
                    eth_dst='00:11:22:33:44:06',
                    eth_src=ROUTER_MAC,
                    ipv6_dst='1234:5678:9abc:def0:4422:1133:5577:1111',
                    ipv6_src='2000::1',
                    ipv6_hlim=63,
                    ipv6_tc=ipv6_tc,
                    pktlen=100)
                initial_q_cnt = self.getPortQueueIndexStats(
                    self.port2, test['queue'], ['SAI_QUEUE_STAT_PACKETS'])
                print("Sending packet port %d (dscp=%d) ->"
                      " port %d (tc=%d, queue=%d)"
                      % (test['tx_port'], test['dscp'], self.dev_port2,
                         test['tc'], test['queue']))
                send_packet(self, test['tx_port'], pkt)
                verify_packet(self, exp_pkt, self.dev_port2)
                post_q_cnt = self.getPortQueueIndexStats(
                    self.port2, test['queue'], ['SAI_QUEUE_STAT_PACKETS'])
                self.assertEqual(
                    initial_q_cnt + 1, post_q_cnt,
                    'queue {} packets counter {} != {}'.format(
                        test['queue'], initial_q_cnt + 1, post_q_cnt))
        finally:
            for port in self.test_ports:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_dscp_to_tc_map=int(
                        SAI_NULL_OBJECT_ID))
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_tc_to_queue_map=int(
                        SAI_NULL_OBJECT_ID))
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
            sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
            sai_thrift_remove_qos_map(self.client, p0_qos_dscp_to_tc_map_id)
            sai_thrift_remove_qos_map(self.client, p1_qos_dscp_to_tc_map_id)
            sai_thrift_remove_qos_map(self.client, p3_qos_dscp_to_tc_map_id)