def l3IPv6QosSameDscpToTcMappingManyPortsTest()

in ptf/saiqosmap.py [0:0]


    def l3IPv6QosSameDscpToTcMappingManyPortsTest(self):
        '''
        L3 IPv6 same DSCP to TC mapping for various ingress port test.
        '''
        print("l3IPv6QosSameDscpToTcMappingManyPortsTest")
        test_cases = []
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 0,
            'tc': 0,
            'queue': 0
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 1,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 2,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 3,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 4,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 5,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 6,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 7,
            'tc': 7,
            'queue': 7
        })
        test_cases.append({
            'tx_port': self.dev_port0,
            'dscp': 10,
            'tc': 0,
            'queue': 0
        })  # default
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 0,
            'tc': 0,
            'queue': 0
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 1,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 2,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 3,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 4,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 5,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 6,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 7,
            'tc': 7,
            'queue': 7
        })
        test_cases.append({
            'tx_port': self.dev_port1,
            'dscp': 10,
            'tc': 0,
            'queue': 0
        })  # default
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 0,
            'tc': 0,
            'queue': 0
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 1,
            'tc': 1,
            'queue': 1
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 2,
            'tc': 2,
            'queue': 2
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 3,
            'tc': 3,
            'queue': 3
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 4,
            'tc': 4,
            'queue': 4
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 5,
            'tc': 5,
            'queue': 5
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 6,
            'tc': 6,
            'queue': 6
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 7,
            'tc': 7,
            'queue': 7
        })
        test_cases.append({
            'tx_port': self.dev_port3,
            'dscp': 10,
            'tc': 0,
            'queue': 0
        })  # default
        try:
            # setup dscp to tc mapping
            ingress_dscp_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            qos_dscp_to_tc_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_DSCP_TO_TC, ingress_dscp_list,
                ingress_tc_list)
            self.assertTrue(qos_dscp_to_tc_map_id != 0,
                            "Failed to create qos_map")
            # setup tc to queue index mapping
            ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
            ingress_queue_list = [0, 1, 2, 3, 4, 5, 6, 7]
            qos_tc_to_queue_map_id = create_and_verify_qos_map(
                self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
                ingress_queue_list)
            self.assertTrue(qos_tc_to_queue_map_id != 0,
                            "Failed to create qos_map")
            ingress_test_ports = [self.port0, self.port1, self.port3]
            egress_test_ports = [self.port2]
            for port in ingress_test_ports:
                status = sai_thrift_set_port_attribute(
                    self.client,
                    port,
                    qos_dscp_to_tc_map=qos_dscp_to_tc_map_id)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_dscp_to_tc_map=True)
                self.assertEqual(attr['qos_dscp_to_tc_map'],
                                 qos_dscp_to_tc_map_id)
            for port in egress_test_ports:
                status = sai_thrift_set_port_attribute(
                    self.client,
                    port,
                    qos_tc_to_queue_map=qos_tc_to_queue_map_id)
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                attr = sai_thrift_get_port_attribute(self.client,
                                                     port,
                                                     qos_tc_to_queue_map=True)
                self.assertEqual(attr['qos_tc_to_queue_map'],
                                 qos_tc_to_queue_map_id)
            for test in test_cases:
                ipv6_tc = test['dscp'] << 2
                pkt = simple_tcpv6_packet(
                    eth_dst=ROUTER_MAC,
                    eth_src='00:22:22:22:22:22',
                    ipv6_dst='1234:5678:9abc:def0:4422:1133:5577:1111',
                    ipv6_src='2000::1',
                    ipv6_hlim=64,
                    ipv6_tc=ipv6_tc,
                    pktlen=100)
                exp_pkt = simple_tcpv6_packet(
                    eth_dst='00:11:22:33:44:06',
                    eth_src=ROUTER_MAC,
                    ipv6_dst='1234:5678:9abc:def0:4422:1133:5577:1111',
                    ipv6_src='2000::1',
                    ipv6_hlim=63,
                    ipv6_tc=ipv6_tc,
                    pktlen=100)
                initial_q_cnt = self.getPortQueueIndexStats(
                    self.port2, test['queue'], ['SAI_QUEUE_STAT_PACKETS'])
                print("Sending packet port %d (dscp=%d) ->"
                      " port %d (tc=%d queue=%d)"
                      % (test['tx_port'], test['dscp'], self.dev_port2,
                         test['tc'], test['queue']))
                send_packet(self, test['tx_port'], pkt)
                verify_packet(self, exp_pkt, self.dev_port2)
                post_q_cnt = self.getPortQueueIndexStats(
                    self.port2, test['queue'], ['SAI_QUEUE_STAT_PACKETS'])
                self.assertEqual(
                    initial_q_cnt + 1, post_q_cnt,
                    'queue {} packets counter {} != {}'.format(
                        test['queue'], initial_q_cnt + 1, post_q_cnt))
        finally:
            for port in self.test_ports:
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_dscp_to_tc_map=int(
                        SAI_NULL_OBJECT_ID))
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
                status = sai_thrift_set_port_attribute(
                    self.client, port, qos_tc_to_queue_map=int(
                        SAI_NULL_OBJECT_ID))
                self.assertEqual(status, SAI_STATUS_SUCCESS,
                                 "Failed to set port attribute")
            sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
            sai_thrift_remove_qos_map(self.client, qos_dscp_to_tc_map_id)