ptf/saiqueue.py (1,156 lines of code) (raw):

# Copyright 2021-present Intel Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Thrift SAI interface queue tests """ from sai_thrift.sai_headers import * from sai_base_test import * TEST_PASSED = False class QueueConfigDataHelper(PlatformSaiHelper): """ Queue Configuration Class """ def setUp(self): super(QueueConfigDataHelper, self).setUp() attr = sai_thrift_get_port_attribute(self.client, self.port25, qos_number_of_queues=True) self.num_queues = attr["qos_number_of_queues"] self.num_ports = len(self.port_list) self.q_list = sai_thrift_object_list_t(count=self.num_queues) # L3 layer configuration self.vr_id = sai_thrift_create_virtual_router(self.client, admin_v4_state=True, admin_v6_state=True) self.assertTrue(self.vr_id != 0) # RIFs self.rif_id25 = sai_thrift_create_router_interface( self.client, virtual_router_id=self.vr_id, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=self.port25, admin_v4_state=True, admin_v6_state=True) self.assertTrue(self.rif_id25 != 0) self.rif_id26 = sai_thrift_create_router_interface( self.client, virtual_router_id=self.vr_id, type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=self.port26, admin_v4_state=True, admin_v6_state=True) self.assertTrue(self.rif_id26 != 0) # Neighbor dst_ip = "172.16.1.1" dst_mask_ip = "172.16.1.1/24" self.neighbor_dmac = "00:11:22:33:44:55" self.neighbor_entry1 = sai_thrift_neighbor_entry_t( self.switch_id, self.rif_id26, sai_ipaddress(dst_ip)) sai_thrift_create_neighbor_entry(self.client, self.neighbor_entry1, dst_mac_address=self.neighbor_dmac) # Nexthop self.nhop = sai_thrift_create_next_hop( self.client, type=SAI_NEXT_HOP_TYPE_IP, router_interface_id=self.rif_id26, ip=sai_ipaddress(dst_ip)) # Route self.route = sai_thrift_route_entry_t( switch_id=self.switch_id, destination=sai_ipprefix(dst_mask_ip), vr_id=self.vr_id) status = sai_thrift_create_route_entry(self.client, self.route, next_hop_id=self.nhop) self.assertEqual(status, SAI_STATUS_SUCCESS) def tearDown(self): sai_thrift_remove_neighbor_entry(self.client, self.neighbor_entry1) sai_thrift_remove_route_entry(self.client, self.route) sai_thrift_remove_next_hop(self.client, self.nhop) sai_thrift_remove_router_interface(self.client, self.rif_id26) sai_thrift_remove_router_interface(self.client, self.rif_id25) sai_thrift_remove_virtual_router(self.client, self.vr_id) super(QueueConfigDataHelper, self).tearDown() class queueCreateTest(QueueConfigDataHelper): """ The test verifies a queue creation. """ def setUp(self): super(queueCreateTest, self).setUp() def runTest(self): print("\nQueue Create Test") test_port = self.port1 try: ignore_api_errors() queue = sai_thrift_create_queue( self.client, type=SAI_QUEUE_TYPE_ALL, index=1) # port is missing, creation should fail self.assertTrue(queue == 0) queue = sai_thrift_create_queue( self.client, port=test_port, index=1) # type is missing, creation should fail self.assertTrue(queue == 0) queue = sai_thrift_create_queue( self.client, type=SAI_QUEUE_TYPE_ALL, port=test_port, index=1) # type is wrong, creation should fail self.assertTrue(queue == 0) queue = sai_thrift_create_queue( self.client, type=SAI_QUEUE_TYPE_ALL, port=test_port) # index is missing, creation should fail self.assertTrue(queue == 0) queue = sai_thrift_create_queue( self.client, type=SAI_QUEUE_TYPE_ALL, port=test_port, index=60) # index is wrong, creation should fail self.assertTrue(queue == 0) queue = sai_thrift_create_queue( self.client, type=SAI_QUEUE_TYPE_ALL, port=test_port, index=1) # queue with such index already exists self.assertTrue(queue == 0) print("\tTest completed successfully") finally: pass def tearDown(self): super(queueCreateTest, self).tearDown() class portQueueQueryTest(QueueConfigDataHelper): """ The test queries SAI_QUEUE_ATTR_PORT and SAI_QUEUE_ATTR_INDEX attributes for all physical ports configured by default. NOTE: If you modify this test, remember to apply the changes to cpuPortQueueObjectTest too. """ def setUp(self): super(portQueueQueryTest, self).setUp() def runTest(self): print("Port Queue Query Test") attr = sai_thrift_get_port_attribute(self.client, self.port1, qos_number_of_queues=True) num_queues = attr["qos_number_of_queues"] num_ports = len(self.port_list) q_list = sai_thrift_object_list_t(count=num_queues) try: for port in range(0, num_ports): attr = sai_thrift_get_port_attribute(self.client, self.port_list[port], qos_queue_list=q_list) for queue in range(0, num_queues): queue_id = attr["qos_queue_list"].idlist[queue] q_attr = sai_thrift_get_queue_attribute( self.client, queue_id, port=True, index=True, parent_scheduler_node=True) self.assertTrue(queue == q_attr["index"]) self.assertTrue(self.port_list[port] == q_attr["port"]) # It should return port handle, # because SDK does not support hierarchical QoS # skip this check, SDK dependent # self.assertTrue(self.port_list[port] == # q_attr["parent_scheduler_node"]) print("\tTest completed successfully") finally: pass def tearDown(self): super(portQueueQueryTest, self).tearDown() class bufferQueueTest(QueueConfigDataHelper): """ The test is dedicated to working buffers with queues. After creating buffer pool and buffer profile, the last one is assigned to a queue. Later the buffer profile is being removed and the default buffer profile should be assigned to the queue. NOTE: If you modify this test, remember to apply the changes to cpuPortQueueObjectTest too. """ def setUp(self): super(bufferQueueTest, self).setUp() def runTest(self): print("Buffer Queue Test") mode = SAI_BUFFER_POOL_THRESHOLD_MODE_DYNAMIC buff_pool = sai_thrift_create_buffer_pool( self.client, type=SAI_BUFFER_POOL_TYPE_EGRESS, size=1024, threshold_mode=mode) self.assertTrue(buff_pool != 0) buff_prof = sai_thrift_create_buffer_profile(self.client, pool_id=buff_pool, reserved_buffer_size=1024, threshold_mode=mode, shared_dynamic_th=1, xoff_th=100, xon_th=10) self.assertTrue(buff_prof != 0) try: attr = sai_thrift_get_port_attribute( self.client, self.port26, qos_queue_list=self.q_list) queue_id = attr["qos_queue_list"].idlist attr = sai_thrift_get_queue_attribute(self.client, queue_id[0], buffer_profile_id=True) default_buff_prof = attr["buffer_profile_id"] status = sai_thrift_set_queue_attribute( self.client, queue_id[0], buffer_profile_id=buff_prof) self.assertEqual(status, SAI_STATUS_SUCCESS) clear_counter( self, sai_thrift_clear_queue_stats, queue_id[0]) pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src="00:00:00:00:00:22", ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_ttl=64) exp_pkt = simple_tcp_packet(eth_dst="00:11:22:33:44:55", eth_src=ROUTER_MAC, ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_ttl=63) send_packet(self, self.dev_port25, pkt) verify_packet(self, exp_pkt, self.dev_port26) print("\tPacket received on PORT26") stats = query_counter( self, sai_thrift_get_queue_stats, queue_id[0]) cnt = stats["SAI_QUEUE_STAT_PACKETS"] self.assertEqual(cnt, 1) # Now the buffer profile is being detached. status = sai_thrift_set_queue_attribute( self.client, queue_id[0], buffer_profile_id=int(SAI_NULL_OBJECT_ID)) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_remove_buffer_profile(self.client, buff_prof) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_remove_buffer_pool(self.client, buff_pool) self.assertEqual(status, SAI_STATUS_SUCCESS) # The queue should be assigned to the default buffer profile. attr = sai_thrift_get_queue_attribute(self.client, queue_id[0], buffer_profile_id=True) self.assertEqual(attr["buffer_profile_id"], default_buff_prof) clear_counter( self, sai_thrift_clear_queue_stats, queue_id[0]) pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src="00:00:00:00:00:22", ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_ttl=64) exp_pkt = simple_tcp_packet(eth_dst="00:11:22:33:44:55", eth_src=ROUTER_MAC, ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_ttl=63) send_packet(self, self.dev_port25, pkt) verify_packet(self, exp_pkt, self.dev_port26) print("\tPacket received on PORT26") stats = query_counter( self, sai_thrift_get_queue_stats, queue_id[0]) cnt = stats["SAI_QUEUE_STAT_PACKETS"] self.assertEqual(cnt, 1) print("\tTest completed successfully") finally: stats = query_counter( self, sai_thrift_get_queue_stats_ext, queue_id[0], SAI_STATS_MODE_READ_AND_CLEAR) cnt = stats["SAI_QUEUE_STAT_PACKETS"] self.assertEqual(cnt, 1) stats = query_counter( self, sai_thrift_get_queue_stats, queue_id[0]) cnt = stats["SAI_QUEUE_STAT_PACKETS"] self.assertEqual(cnt, 0) print("\tTest completed successfully") def tearDown(self): super(bufferQueueTest, self).tearDown() class schedulerQueueTest(QueueConfigDataHelper): """ The test scenario consists of attaching scheduler profile to a queue and validate following parameters: queue priority, weight, min and max rate. NOTE: If you modify this test, remember to apply the changes to cpuPortQueueObjectTest too. """ def setUp(self): super(schedulerQueueTest, self).setUp() def runTest(self): print("Scheduler Queue Test") weight = 4 min_rate = 100 max_rate = 1000 min_burst = 200 max_burst = 800 sched1 = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_DWRR, scheduling_weight=weight) self.assertTrue(sched1 != 0) sched2 = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_STRICT, min_bandwidth_rate=min_rate, max_bandwidth_rate=max_rate) self.assertTrue(sched2 != 0) sched3 = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_STRICT, min_bandwidth_burst_rate=min_burst, max_bandwidth_burst_rate=max_burst) self.assertTrue(sched3 != 0) print("\tScheduler profiles created successfully") try: attr = sai_thrift_get_port_attribute( self.client, self.port25, qos_queue_list=self.q_list) queue_id = attr["qos_queue_list"].idlist status = sai_thrift_set_queue_attribute( self.client, queue_id[1], scheduler_profile_id=sched1) self.assertEqual(status, SAI_STATUS_SUCCESS) q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[1], scheduler_profile_id=True) self.assertEqual(q_attr["scheduler_profile_id"], sched1) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched1, scheduling_weight=True) self.assertEqual(s_attr["scheduling_weight"], weight) print("\tScheduling weight validated successfully") status = sai_thrift_set_queue_attribute( self.client, queue_id[2], scheduler_profile_id=sched2) self.assertEqual(status, SAI_STATUS_SUCCESS) q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[2], scheduler_profile_id=True) self.assertEqual(q_attr["scheduler_profile_id"], sched2) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched2, min_bandwidth_rate=True, max_bandwidth_rate=True) self.assertEqual(s_attr["min_bandwidth_rate"], min_rate) self.assertEqual(s_attr["max_bandwidth_rate"], max_rate) print("\tBandwidth rates validated successfully") status = sai_thrift_set_queue_attribute( self.client, queue_id[3], scheduler_profile_id=sched3) self.assertEqual(status, SAI_STATUS_SUCCESS) q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[3], scheduler_profile_id=True) self.assertEqual(q_attr["scheduler_profile_id"], sched3) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched3, min_bandwidth_burst_rate=True, max_bandwidth_burst_rate=True) self.assertEqual(s_attr["min_bandwidth_burst_rate"], min_burst) self.assertEqual(s_attr["max_bandwidth_burst_rate"], max_burst) print("\tBandwidth burst rates validated successfully") print("\tNow the parameters will be modified") weight = 3 min_rate = 120 max_rate = 980 min_burst = 300 max_burst = 800 status = sai_thrift_set_scheduler_attribute( self.client, sched1, scheduling_weight=weight) self.assertEqual(status, SAI_STATUS_SUCCESS) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched1, scheduling_weight=True) self.assertEqual(s_attr["scheduling_weight"], weight) print("\tScheduling weight validated successfully") status = sai_thrift_set_scheduler_attribute( self.client, sched2, min_bandwidth_rate=min_rate) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_scheduler_attribute( self.client, sched2, max_bandwidth_rate=max_rate) self.assertEqual(status, SAI_STATUS_SUCCESS) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched2, min_bandwidth_rate=True, max_bandwidth_rate=True) self.assertEqual(s_attr["min_bandwidth_rate"], min_rate) self.assertEqual(s_attr["max_bandwidth_rate"], max_rate) print("\tBandwidth rates validated successfully") status = sai_thrift_set_scheduler_attribute( self.client, sched3, min_bandwidth_burst_rate=min_burst) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_scheduler_attribute( self.client, sched3, max_bandwidth_burst_rate=max_burst) self.assertEqual(status, SAI_STATUS_SUCCESS) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched3, min_bandwidth_burst_rate=True, max_bandwidth_burst_rate=True) self.assertEqual(s_attr["min_bandwidth_burst_rate"], min_burst) self.assertEqual(s_attr["max_bandwidth_burst_rate"], max_burst) print("\tBandwidth burst rates validated successfully") print("\tTest completed successfully") finally: status = sai_thrift_set_queue_attribute( self.client, queue_id[3], scheduler_profile_id=0) status = sai_thrift_set_queue_attribute( self.client, queue_id[2], scheduler_profile_id=0) status = sai_thrift_set_queue_attribute( self.client, queue_id[1], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) self.assertEqual(status, SAI_STATUS_SUCCESS) self.assertEqual(status, SAI_STATUS_SUCCESS) sai_thrift_remove_scheduler(self.client, sched3) sai_thrift_remove_scheduler(self.client, sched2) sai_thrift_remove_scheduler(self.client, sched1) def tearDown(self): super(schedulerQueueTest, self).tearDown() class pfcPriorityQueueTest(QueueConfigDataHelper): """ The test verifies the correct configuration of mapping between Priority-based Flow Control (PFC) and a queue. NOTE: If you modify this test, remember to apply the changes to cpuPortQueueObjectTest too. """ def setUp(self): super(pfcPriorityQueueTest, self).setUp() def runTest(self): print("PFC Priority Queue Test") p_attr = sai_thrift_get_port_attribute(self.client, self.port25, qos_queue_list=self.q_list) queue_id = p_attr["qos_queue_list"].idlist q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[4], index=True) q_idx = q_attr["index"] try: # The PFC Priority -> Queue map table configuration. pfc_to_queue_map = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(prio=4), value=sai_thrift_qos_map_params_t(queue_index=q_idx)) qos_map_list = sai_thrift_qos_map_list_t( count=1, maplist=[pfc_to_queue_map]) qos_map = sai_thrift_create_qos_map( self.client, type=SAI_QOS_MAP_TYPE_PFC_PRIORITY_TO_QUEUE, map_to_value_list=qos_map_list) self.assertTrue(qos_map != 0) # The queue index modification in the previously created QoS map. p_attr = sai_thrift_get_port_attribute(self.client, self.port26, qos_queue_list=self.q_list) new_queue_id = p_attr["qos_queue_list"].idlist q_attr = sai_thrift_get_queue_attribute( self.client, new_queue_id[3], index=True) new_q_idx = q_attr["index"] new_pfc_to_queue_map = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(prio=4), value=sai_thrift_qos_map_params_t(queue_index=new_q_idx)) new_qos_map_list = sai_thrift_qos_map_list_t( count=1, maplist=[new_pfc_to_queue_map]) status = sai_thrift_set_qos_map_attribute( self.client, qos_map, map_to_value_list=new_qos_map_list) self.assertEqual(status, SAI_STATUS_SUCCESS) # Verify if the correct queue is applied to the QoS map. map_lists = sai_thrift_qos_map_list_t(count=1) attr = sai_thrift_get_qos_map_attribute( self.client, qos_map, map_to_value_list=map_lists) queue_idx = attr["map_to_value_list"].maplist[0].value.queue_index self.assertEqual(queue_idx, new_q_idx) print("\tTest completed successfully") finally: sai_thrift_remove_qos_map(self.client, qos_map) def tearDown(self): super(pfcPriorityQueueTest, self).tearDown() class cpuPortQueueObjectTest(QueueConfigDataHelper): """ The test uses CPU port instead of external port for all test cases above: portQueueQueryTest, bufferQueueTest and schedulerQueueTest, pfcPriorityQueueTest. WRED is not supported on CPU queues. """ def setUp(self): super(cpuPortQueueObjectTest, self).setUp() def runTest(self): print("CPU Port Queue Object Test") attr = sai_thrift_get_switch_attribute(self.client, cpu_port=True) cpu_port_id = attr["cpu_port"] print("\tPART1: Port Queue Query Test") global TEST_PASSED try: for port in self.port_list: attr = sai_thrift_get_port_attribute(self.client, port, qos_number_of_queues=True) num_queues = attr["qos_number_of_queues"] q_list = sai_thrift_object_list_t(count=num_queues) attr = sai_thrift_get_port_attribute(self.client, port, qos_queue_list=q_list) for queue in range(0, num_queues): queue_id = attr["qos_queue_list"].idlist[queue] q_attr = sai_thrift_get_queue_attribute( self.client, queue_id, port=True, index=True, parent_scheduler_node=True) self.assertTrue(queue == q_attr["index"]) self.assertTrue(port == q_attr["port"]) # It should return port handle, # because SDK does not support hierarchical QoS # skip this test, platform dependent #self.assertTrue(port == q_attr["parent_scheduler_node"]) print("\t\tPART1 completed successfully") print("\tPART2: Buffer Queue Test") # Get the queue for the CPU port - for the buffer queue test. attr = sai_thrift_get_port_attribute(self.client, cpu_port_id, qos_number_of_queues=True) num_queues = attr["qos_number_of_queues"] q_list = sai_thrift_object_list_t(count=num_queues) attr = sai_thrift_get_port_attribute( self.client, cpu_port_id, qos_queue_list=q_list) queue_id = attr["qos_queue_list"].idlist mode = SAI_BUFFER_POOL_THRESHOLD_MODE_DYNAMIC buff_pool = sai_thrift_create_buffer_pool( self.client, type=SAI_BUFFER_POOL_TYPE_EGRESS, size=1024, threshold_mode=mode) self.assertTrue(buff_pool != 0) buff_prof = sai_thrift_create_buffer_profile( self.client, pool_id=buff_pool, reserved_buffer_size=1024, threshold_mode=mode, shared_dynamic_th=1, xoff_th=100, xon_th=10) self.assertTrue(buff_prof != 0) attr = sai_thrift_get_queue_attribute(self.client, queue_id[0], buffer_profile_id=True) default_buff_prof = attr["buffer_profile_id"] status = sai_thrift_set_queue_attribute( self.client, queue_id[0], buffer_profile_id=buff_prof) self.assertEqual(status, SAI_STATUS_SUCCESS) # Now the buffer profile is being detached. status = sai_thrift_set_queue_attribute( self.client, queue_id[0], buffer_profile_id=int(SAI_NULL_OBJECT_ID)) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_remove_buffer_profile(self.client, buff_prof) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_remove_buffer_pool(self.client, buff_pool) self.assertEqual(status, SAI_STATUS_SUCCESS) # The queue should be assigned to the default buffer profile. attr = sai_thrift_get_queue_attribute(self.client, queue_id[0], buffer_profile_id=True) self.assertEqual(attr["buffer_profile_id"], default_buff_prof) print("\t\tPART2 completed successfully") print("\tPART3: Scheduler Queue Test") weight = 4 min_rate = 100 max_rate = 1000 min_burst = 200 max_burst = 800 # Get the queue for the CPU port - for the scheduler queue test. attr = sai_thrift_get_port_attribute( self.client, cpu_port_id, qos_queue_list=q_list) queue_id = attr["qos_queue_list"].idlist sched1 = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_DWRR, scheduling_weight=weight) self.assertTrue(sched1 != 0) sched2 = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_STRICT, min_bandwidth_rate=min_rate, max_bandwidth_rate=max_rate) self.assertTrue(sched2 != 0) sched3 = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_STRICT, min_bandwidth_burst_rate=min_burst, max_bandwidth_burst_rate=max_burst) self.assertTrue(sched3 != 0) print("\t\tScheduler profiles created successfully") status = sai_thrift_set_queue_attribute( self.client, queue_id[1], scheduler_profile_id=sched1) self.assertEqual(status, SAI_STATUS_SUCCESS) q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[1], scheduler_profile_id=True) self.assertEqual(q_attr["scheduler_profile_id"], sched1) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched1, scheduling_weight=True) self.assertEqual(s_attr["scheduling_weight"], weight) print("\t\tScheduling weight validated successfully") status = sai_thrift_set_queue_attribute( self.client, queue_id[2], scheduler_profile_id=sched2) self.assertEqual(status, SAI_STATUS_SUCCESS) q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[2], scheduler_profile_id=True) self.assertEqual(q_attr["scheduler_profile_id"], sched2) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched2, min_bandwidth_rate=True, max_bandwidth_rate=True) self.assertEqual(s_attr["min_bandwidth_rate"], min_rate) self.assertEqual(s_attr["max_bandwidth_rate"], max_rate) print("\t\tBandwidth rates validated successfully") status = sai_thrift_set_queue_attribute( self.client, queue_id[3], scheduler_profile_id=sched3) self.assertEqual(status, SAI_STATUS_SUCCESS) q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[3], scheduler_profile_id=True) self.assertEqual(q_attr["scheduler_profile_id"], sched3) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched3, min_bandwidth_burst_rate=True, max_bandwidth_burst_rate=True) self.assertEqual(s_attr["min_bandwidth_burst_rate"], min_burst) self.assertEqual(s_attr["max_bandwidth_burst_rate"], max_burst) print("\t\tBandwidth burst rates validated successfully") print("\t\tNow the parameters will be modified") weight = 3 min_rate = 120 max_rate = 980 min_burst = 300 max_burst = 800 status = sai_thrift_set_scheduler_attribute( self.client, sched1, scheduling_weight=weight) self.assertEqual(status, SAI_STATUS_SUCCESS) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched1, scheduling_weight=True) self.assertEqual(s_attr["scheduling_weight"], weight) print("\t\tScheduling weight validated successfully") status = sai_thrift_set_scheduler_attribute( self.client, sched2, min_bandwidth_rate=min_rate) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_scheduler_attribute( self.client, sched2, max_bandwidth_rate=max_rate) self.assertEqual(status, SAI_STATUS_SUCCESS) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched2, min_bandwidth_rate=True, max_bandwidth_rate=True) self.assertEqual(s_attr["min_bandwidth_rate"], min_rate) self.assertEqual(s_attr["max_bandwidth_rate"], max_rate) print("\t\tBandwidth rates validated successfully") status = sai_thrift_set_scheduler_attribute( self.client, sched3, min_bandwidth_burst_rate=min_burst) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_scheduler_attribute( self.client, sched3, max_bandwidth_burst_rate=max_burst) self.assertEqual(status, SAI_STATUS_SUCCESS) s_attr = sai_thrift_get_scheduler_attribute( self.client, sched3, min_bandwidth_burst_rate=True, max_bandwidth_burst_rate=True) self.assertEqual(s_attr["min_bandwidth_burst_rate"], min_burst) self.assertEqual(s_attr["max_bandwidth_burst_rate"], max_burst) print("\t\tBandwidth burst rates validated successfully") print("\t\tPART3 completed successfully") print("\tPART4: PFC Priority Queue Test") # Get the queue for the CPU port. p_attr = sai_thrift_get_port_attribute( self.client, cpu_port_id, qos_queue_list=q_list) queue_id = p_attr["qos_queue_list"].idlist q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[4], index=True) q_idx = q_attr["index"] # The PFC Priority -> Queue map table configuration. pfc_to_queue_map = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(prio=4), value=sai_thrift_qos_map_params_t(queue_index=q_idx)) qos_map_list = sai_thrift_qos_map_list_t( count=1, maplist=[pfc_to_queue_map]) qos_map = sai_thrift_create_qos_map( self.client, type=SAI_QOS_MAP_TYPE_PFC_PRIORITY_TO_QUEUE, map_to_value_list=qos_map_list) self.assertTrue(qos_map != 0) # The queue index modification in the previously created QoS map. q_attr = sai_thrift_get_queue_attribute( self.client, queue_id[3], index=True) new_q_idx = q_attr["index"] new_pfc_to_queue_map = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(prio=4), value=sai_thrift_qos_map_params_t(queue_index=new_q_idx)) new_qos_map_list = sai_thrift_qos_map_list_t( count=1, maplist=[new_pfc_to_queue_map]) status = sai_thrift_set_qos_map_attribute( self.client, qos_map, map_to_value_list=new_qos_map_list) self.assertEqual(status, SAI_STATUS_SUCCESS) # The verification if correct queue is applied to QoS map map_lists = sai_thrift_qos_map_list_t(count=1) attr = sai_thrift_get_qos_map_attribute( self.client, qos_map, map_to_value_list=map_lists) queue_idx = attr["map_to_value_list"].maplist[0].value.queue_index self.assertEqual(queue_idx, new_q_idx) print("\t\tPART4 completed successfully") print("\tTest completed successfully") TEST_PASSED = True finally: if not TEST_PASSED: print("\tTest FAILED, skip tear down for un-expected operation.") else: sai_thrift_remove_qos_map(self.client, qos_map) status = sai_thrift_set_queue_attribute( self.client, queue_id[3], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_queue_attribute( self.client, queue_id[2], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_queue_attribute( self.client, queue_id[1], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) sai_thrift_remove_scheduler(self.client, sched3) sai_thrift_remove_scheduler(self.client, sched2) sai_thrift_remove_scheduler(self.client, sched1) def tearDown(self): super(cpuPortQueueObjectTest, self).tearDown() @group("queue-wred") class wredQueueTest(QueueConfigDataHelper): """ The test verifies WRED attaching and removing for a particular queue. """ def setUp(self): super(wredQueueTest, self).setUp() def runTest(self): print("\nWRED Queue Test") wred_id = sai_thrift_create_wred(self.client, green_enable=True, green_min_threshold=0, green_max_threshold=4000, green_drop_probability=100) self.assertTrue(wred_id != 0) attr = sai_thrift_get_port_attribute( self.client, self.port25, qos_queue_list=self.q_list) queue_id = attr["qos_queue_list"].idlist status = sai_thrift_set_queue_attribute( self.client, queue_id[0], wred_profile_id=wred_id) self.assertEqual(status, SAI_STATUS_SUCCESS) pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src="00:00:00:00:00:22", ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_ttl=64) exp_pkt = simple_tcp_packet(eth_dst="00:11:22:33:44:55", eth_src=ROUTER_MAC, ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_ttl=63) try: print("\tSending packet PORT25 -> PORT26") send_packet(self, self.dev_port25, pkt) verify_packet(self, exp_pkt, self.dev_port26) print("\tPacket received on PORT26") print("\tTest completed successfully") finally: status = sai_thrift_set_queue_attribute(self.client, queue_id[0], wred_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_remove_wred(self.client, wred_id) self.assertEqual(status, SAI_STATUS_SUCCESS) def tearDown(self): super(wredQueueTest, self).tearDown() @group("queue-hw") @group("draft") class dwrrBandwidthDistributionTest(QueueConfigDataHelper): # noqa pylint: disable=too-many-branches """ The configuration of scheduler to a given queue is done. Then DWRR bandwidth distribution according to DWRR weight is being validated. The last step is to modify weights in previously created scheduler profile and the analogous validation is repeated. """ def setUp(self): super(dwrrBandwidthDistributionTest, self).setUp() def runTest(self): print("DWRR Bandwidth Distribution Test") # Create configuration # 1. QoS map configuration for egress port to redirect traffic # on queues. # DSCP -> TC egress, PORT 25 (sender) map_list = [] dscp_list = [0, 1, 2, 3, 4, 5, 6, 7] tc_list = [0, 1, 2, 3, 4, 5, 6, 7] for traffic_class, dscp in zip(tc_list, dscp_list): dscp_to_tc = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(dscp=dscp), value=sai_thrift_qos_map_params_t(tc=traffic_class)) map_list.append(dscp_to_tc) qos_map_list = sai_thrift_qos_map_list_t( count=len(map_list), maplist=map_list) indirect_qos_map = sai_thrift_create_qos_map( self.client, type=SAI_QOS_MAP_TYPE_DSCP_TO_TC, map_to_value_list=qos_map_list) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_dscp_to_tc_map=indirect_qos_map) self.assertEqual(status, SAI_STATUS_SUCCESS) # TC -> Queue on egress, PORT 25 (sender) map_list2 = [] egress_queue_indices = [] # Port 25 queue list (sender) port25_attr = sai_thrift_get_port_attribute( self.client, self.port25, qos_number_of_queues=True, qos_queue_list=self.q_list) num_queues = port25_attr["qos_number_of_queues"] queue_id_list_port25 = port25_attr["qos_queue_list"].idlist # Port 26 queue list (receiver) port26_attr = sai_thrift_get_port_attribute( self.client, self.port26, qos_queue_list=self.q_list) queue_id_list_port26 = port26_attr["qos_queue_list"].idlist for i in tc_list: egress_queue_attr = sai_thrift_get_queue_attribute( self.client, port25_attr["qos_queue_list"].idlist[i], index=True) queue_index = egress_queue_attr["index"] egress_queue_indices.append(queue_index) tc_to_queue = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(tc=i), value=sai_thrift_qos_map_params_t(queue_index=queue_index)) map_list2.append(tc_to_queue) # Clear statistics for each queue in PORT25 clear_counter( self, sai_thrift_clear_queue_stats, queue_id_list_port25[i]) # Clear statistics for each queue in PORT26 clear_counter( self, sai_thrift_clear_queue_stats, queue_id_list_port26[i]) qos_map_list2 = sai_thrift_qos_map_list_t( count=len(map_list2), maplist=map_list2) target_qos_map = sai_thrift_create_qos_map( self.client, type=SAI_QOS_MAP_TYPE_TC_TO_QUEUE, map_to_value_list=qos_map_list2) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_tc_to_queue_map=target_qos_map) self.assertEqual(status, SAI_STATUS_SUCCESS) # 2. Create scheduler's structre # Scheduler profiles for queues schedulers = [] weight = 8 for queue in range(0, int(num_queues / 2)): sched = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_DWRR, scheduling_weight=weight) self.assertTrue(sched != 0) schedulers.append(sched) weight //= 2 # Scheduler profile for port shape shaper = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_DWRR, max_bandwidth_rate=750) self.assertTrue(shaper != 0) status = sai_thrift_set_port_attribute( self.client, self.port26, qos_scheduler_profile_id=shaper) self.assertEqual(status, SAI_STATUS_SUCCESS) # 3. Create queue's structure for queue in range(0, num_queues): # weights for: queue.0 = 8, queue.1 = 4, queue.2 = 2 if num_queues < 3: status = sai_thrift_set_queue_attribute( self.client, queue_id_list_port26[queue], scheduler_profile_id=schedulers[queue % 3]) # weights for others: queue.x = 1 else: status = sai_thrift_set_queue_attribute( self.client, queue_id_list_port26[queue], scheduler_profile_id=schedulers[3]) self.assertEqual(status, SAI_STATUS_SUCCESS) print("\tConfiguration completed") # Build packets with different type of service pkts = [] exp_pkts = [] for i in range(0, num_queues): tos = i << 2 pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src="00:00:00:00:00:22", ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_tos=tos, ip_ttl=64) pkts.append(pkt) exp_pkt = simple_tcp_packet(eth_dst="00:11:22:33:44:55", eth_src=ROUTER_MAC, ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_tos=tos, ip_ttl=63) exp_pkts.append(exp_pkt) try: print("\tSending 19 packets PORT25 -> PORT26") rec_pkt = 0 congestion_rate = 2 pkt_cnt = congestion_rate * num_queues for _ in range(0, congestion_rate): for i in range(0, num_queues): send_packet(self, self.dev_port25, pkts[i]) # To increase packet rate, disable verification # verify_packet(self, exp_pkts[i], self.dev_port26) rec_pkt += 1 send_packet(self, self.dev_port25, pkts[2]) print("\tChecking if received packet number equals sent one") print("Expected: {}, Received: {}".format(pkt_cnt, rec_pkt)) self.assertTrue(rec_pkt == pkt_cnt) print("\tPackets received on PORT26") print("\tDelay for statistics read") time.sleep(5) print("\tChecking received packets on particular queues") exp_pkt_num = 8 for queue in range(0, num_queues): time.sleep(5) q_stats = query_counter( self, sai_thrift_get_queue_stats, queue_id_list_port26[queue]) rec_pkt_num = q_stats["SAI_QUEUE_STAT_PACKETS"] print("Queue:", queue) print("Received no. packets:", rec_pkt_num) print("Expected no. packets:", exp_pkt_num) # self.assertTrue(rec_pkt_num == exp_pkt_num) if exp_pkt_num != 1: exp_pkt_num //= 2 print("\tThe packet distribution is correct.") print("\tTest completed successfully") finally: stats = query_counter( self, sai_thrift_get_queue_stats_ext, queue_id_list_port26[0], SAI_STATS_MODE_READ_AND_CLEAR) cnt = stats["SAI_QUEUE_STAT_PACKETS"] self.assertEqual(cnt, 2) stats = query_counter( self, sai_thrift_get_queue_stats, queue_id_list_port26[0]) cnt = stats["SAI_QUEUE_STAT_PACKETS"] self.assertEqual(cnt, 0) print("\tTest completed successfully") status = sai_thrift_set_port_attribute( self.client, self.port26, qos_scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) sai_thrift_remove_scheduler(self.client, shaper) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_tc_to_queue_map=0) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_dscp_to_tc_map=0) self.assertEqual(status, SAI_STATUS_SUCCESS) sai_thrift_remove_qos_map(self.client, indirect_qos_map) sai_thrift_remove_qos_map(self.client, target_qos_map) for queue in range(0, num_queues): status = sai_thrift_set_queue_attribute( self.client, queue_id_list_port26[queue], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) for i in range(0, int(num_queues / 2)): sai_thrift_remove_scheduler(self.client, schedulers[i]) def tearDown(self): super(dwrrBandwidthDistributionTest, self).tearDown() class strictPriorityQueueTest(QueueConfigDataHelper): """ After the creation of strict priority type for scheduler profile, it has been attached to a queue. Then, overwriting the port number causes forwarding the traffic to other port. """ def setUp(self): super(strictPriorityQueueTest, self).setUp() def runTest(self): print("Strict Priority Queue Test") # 1. QoS map configuration for egress port to redirect traffic # on queues. # DSCP -> TC egress, PORT 25 (sender) map_list = [] dscp_list = [0, 1, 2, 3, 4, 5, 6, 7] tc_list = [0, 1, 2, 3, 4, 5, 6, 7] for traffic_class, dscp in zip(tc_list, dscp_list): dscp_to_tc = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(dscp=dscp), value=sai_thrift_qos_map_params_t(tc=traffic_class)) map_list.append(dscp_to_tc) qos_map_list = sai_thrift_qos_map_list_t( count=len(map_list), maplist=map_list) indirect_qos_map = sai_thrift_create_qos_map( self.client, type=SAI_QOS_MAP_TYPE_DSCP_TO_TC, map_to_value_list=qos_map_list) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_dscp_to_tc_map=indirect_qos_map) self.assertEqual(status, SAI_STATUS_SUCCESS) # TC -> Queue on egress, PORT 25 (sender) map_list2 = [] egress_queue_indices = [] # Port 25 queue list (sender) port25_attr = sai_thrift_get_port_attribute( self.client, self.port25, qos_number_of_queues=True, qos_queue_list=self.q_list) queue_id_list_port25 = port25_attr["qos_queue_list"].idlist # Port 26 queue list (receiver) port26_attr = sai_thrift_get_port_attribute( self.client, self.port26, qos_queue_list=self.q_list) queue_id_list_port26 = port26_attr["qos_queue_list"].idlist for i in tc_list: egress_queue_attr = sai_thrift_get_queue_attribute( self.client, port25_attr["qos_queue_list"].idlist[i], index=True) queue_index = egress_queue_attr["index"] egress_queue_indices.append(queue_index) tc_to_queue = sai_thrift_qos_map_t( key=sai_thrift_qos_map_params_t(tc=i), value=sai_thrift_qos_map_params_t(queue_index=queue_index)) map_list2.append(tc_to_queue) # Clear statistics for each queue in PORT25 clear_counter( self, sai_thrift_clear_queue_stats, queue_id_list_port25[i]) # Clear statistics for each queue in PORT26 clear_counter( self, sai_thrift_clear_queue_stats, queue_id_list_port26[i]) qos_map_list2 = sai_thrift_qos_map_list_t( count=len(map_list2), maplist=map_list2) target_qos_map = sai_thrift_create_qos_map( self.client, type=SAI_QOS_MAP_TYPE_TC_TO_QUEUE, map_to_value_list=qos_map_list2) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_tc_to_queue_map=target_qos_map) self.assertEqual(status, SAI_STATUS_SUCCESS) # Port 27 queue list (oversubscribed port) port27_attr = sai_thrift_get_port_attribute( self.client, self.port27, qos_queue_list=self.q_list) queue_id_list_port27 = port27_attr["qos_queue_list"].idlist # Attach the scheduler profile to queue[1] of port26 sched = sai_thrift_create_scheduler( self.client, meter_type=SAI_METER_TYPE_PACKETS, scheduling_type=SAI_SCHEDULING_TYPE_STRICT) self.assertTrue(sched != 0) status = sai_thrift_set_queue_attribute( self.client, queue_id_list_port26[1], scheduler_profile_id=sched) self.assertEqual(status, SAI_STATUS_SUCCESS) port27_attr = sai_thrift_get_port_attribute( self.client, self.port27, qos_number_of_scheduler_groups=True) num_sched_grps = port27_attr["qos_number_of_scheduler_groups"] sched_grp_list = sai_thrift_object_list_t(count=num_sched_grps) port27_attr = sai_thrift_get_port_attribute( self.client, self.port27, qos_scheduler_group_list=sched_grp_list) sch_grps_port27 = port27_attr["qos_scheduler_group_list"].idlist # Oversubscribe port - attach scheduler to PORT27 status = sai_thrift_set_scheduler_group_attribute( self.client, sch_grps_port27[0], scheduler_profile_id=sched) self.assertEqual(status, SAI_STATUS_SUCCESS) print("\tConfiguration completed") pkt = simple_tcp_packet(eth_dst=ROUTER_MAC, eth_src="00:00:00:00:00:22", ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_tos=4, ip_ttl=64) exp_pkt = simple_tcp_packet(eth_dst="00:11:22:33:44:55", eth_src=ROUTER_MAC, ip_dst="172.16.1.1", ip_src="192.168.0.1", ip_id=105, ip_tos=4, ip_ttl=63) try: print("\tSending 19 packets PORT25 -> PORT26") pkt_cnt = 19 for _ in range(0, pkt_cnt): send_packet(self, self.dev_port25, pkt) time.sleep(0.1) verify_packet(self, exp_pkt, self.dev_port26) verify_no_other_packets(self) print("\tReceived all packets") # Sleep added to enable statistics reading time.sleep(4) # Statistics read q_stats_port26 = query_counter( self, sai_thrift_get_queue_stats, queue_id_list_port26[1]) received_pkt_num = q_stats_port26["SAI_QUEUE_STAT_PACKETS"] print("\tqueue[1] of PORT26, no. packets:", received_pkt_num) # queue[0] q_stats_port27 = query_counter( self, sai_thrift_get_queue_stats, queue_id_list_port27[0]) forwarded_pkt_num = q_stats_port27["SAI_QUEUE_STAT_PACKETS"] print("\tqueue[0] of PORT27, no. packets:", forwarded_pkt_num) # queue[1] q_stats_port27 = query_counter( self, sai_thrift_get_queue_stats, queue_id_list_port27[1]) forwarded_pkt_num = q_stats_port27["SAI_QUEUE_STAT_PACKETS"] print("\tqueue[1] of PORT27, no. packets:", forwarded_pkt_num) print("\tChecking if received packet number equals sent one") self.assertTrue(received_pkt_num == pkt_cnt) print("\tTest completed successfully") finally: status = sai_thrift_set_scheduler_group_attribute( self.client, sch_grps_port27[0], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_queue_attribute( self.client, queue_id_list_port26[1], scheduler_profile_id=0) self.assertEqual(status, SAI_STATUS_SUCCESS) sai_thrift_remove_scheduler(self.client, sched) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_tc_to_queue_map=0) self.assertEqual(status, SAI_STATUS_SUCCESS) status = sai_thrift_set_port_attribute( self.client, self.port25, qos_dscp_to_tc_map=0) self.assertEqual(status, SAI_STATUS_SUCCESS) sai_thrift_remove_qos_map(self.client, indirect_qos_map) sai_thrift_remove_qos_map(self.client, target_qos_map) def tearDown(self): super(strictPriorityQueueTest, self).tearDown()