in ptf/saiqosmap.py [0:0]
def l2QosMapMultiplePCPToOneTcMappingTest(self):
''' Test verifies the following:
1. L2 traffic multiple PCP to TC mapping
The verification method:
- setting the TC to QUEUE mapping
- verification of the queue stats
'''
print("l2QosMapMultiplePCPToOneTcMappingTest")
test_cases = []
# port default_tc Test Case
test_cases.append({
'pcp': 0,
'port_default_tc': 0,
'p1_queue': 3,
'p2_queue': 3,
'p3_queue': 3
})
test_cases.append({
'pcp': 1,
'port_default_tc': 0,
'p1_queue': 3,
'p2_queue': 3,
'p3_queue': 3
})
test_cases.append({
'pcp': 2,
'port_default_tc': 0,
'p1_queue': 3,
'p2_queue': 3,
'p3_queue': 3
})
test_cases.append({
'pcp': 3,
'port_default_tc': 0,
'p1_queue': 3,
'p2_queue': 3,
'p3_queue': 3
})
test_cases.append({
'pcp': 4,
'port_default_tc': 0,
'p1_queue': 7,
'p2_queue': 7,
'p3_queue': 7
})
test_cases.append({
'pcp': 5,
'port_default_tc': 0,
'p1_queue': 7,
'p2_queue': 7,
'p3_queue': 7
})
test_cases.append({
'pcp': 6,
'port_default_tc': 0,
'p1_queue': 7,
'p2_queue': 7,
'p3_queue': 7
})
test_cases.append({
'pcp': 7,
'port_default_tc': 0,
'p1_queue': 7,
'p2_queue': 7,
'p3_queue': 7
})
try:
print("Create dot1p -> tc qos mapping")
ingress_dot1p_list = [0, 1, 2, 3, 4, 5, 6, 7]
ingress_tc_list = [3, 3, 3, 3, 7, 7, 7, 7]
qos_dot1p_to_tc_map_id = create_and_verify_qos_map(
self.client, SAI_QOS_MAP_TYPE_DOT1P_TO_TC, ingress_dot1p_list,
ingress_tc_list)
self.assertTrue(qos_dot1p_to_tc_map_id != 0,
"Failed to create qos_map")
print("\tok")
print("Create tc -> queue index qos mapping")
ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
ingress_queue_list = [3, 3, 3, 3, 7, 7, 7, 7]
qos_tc_to_queue_map_id = create_and_verify_qos_map(
self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
ingress_queue_list)
self.assertTrue(qos_tc_to_queue_map_id != 0,
"Failed to create qos_map")
print("\tok")
# configure the ingress ports
for port, dot1p_to_tc in [[self.port0, qos_dot1p_to_tc_map_id]]:
status = sai_thrift_set_port_attribute(
self.client, port, qos_dot1p_to_tc_map=dot1p_to_tc)
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_dot1p_to_tc_map=True)
self.assertEqual(attr['qos_dot1p_to_tc_map'], dot1p_to_tc)
# configure the egress ports
for port, tc_to_queue in [[self.port1, qos_tc_to_queue_map_id],
[self.port2, qos_tc_to_queue_map_id],
[self.port3, qos_tc_to_queue_map_id]]:
status = sai_thrift_set_port_attribute(
self.client, port, qos_tc_to_queue_map=tc_to_queue)
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_tc_to_queue_map=True)
self.assertEqual(attr['qos_tc_to_queue_map'], tc_to_queue)
# verify port default tc
for port in [self.port0, self.port1, self.port2, self.port3]:
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_dscp_to_tc_map=True,
qos_default_tc=True)
self.assertEqual(attr['qos_dscp_to_tc_map'], 0)
self.assertEqual(attr['qos_default_tc'], 0)
for test in test_cases:
pcp = test['pcp']
# setup port default_tc
port_default_tc = 0
if test['port_default_tc'] is not None:
port_default_tc = test['port_default_tc']
for port in [self.port0]:
sai_thrift_set_port_attribute(
self.client, port, qos_default_tc=port_default_tc)
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_default_tc=True)
self.assertEqual(attr['qos_default_tc'], port_default_tc)
p1_initial_q_cnt = self.getPortQueueIndexStats(
self.port1, test['p1_queue'], ['SAI_QUEUE_STAT_PACKETS'])
p2_initial_q_cnt = self.getPortQueueIndexStats(
self.port2, test['p2_queue'], ['SAI_QUEUE_STAT_PACKETS'])
p3_initial_q_cnt = self.getPortQueueIndexStats(
self.port3, test['p3_queue'], ['SAI_QUEUE_STAT_PACKETS'])
print("Sending packet port %d (pcp=%d) -> "
"port %d (port_default_tc=%d queue=%d)"
% (self.dev_port0, pcp, self.dev_port1,
test['port_default_tc'], test['p1_queue']))
pkt = simple_eth_raw_packet_with_taglist(
pktlen=104,
eth_dst='00:33:33:33:33:00',
eth_src='00:33:33:33:33:11',
dl_taglist_enable=True,
dl_vlan_pcp_list=[pcp],
dl_vlanid_list=[1])
test_port = self.dev_port0
flood_port_list = [[self.dev_port1], [self.dev_port2],
[self.dev_port3]]
flood_pkt_list = [pkt, pkt, pkt]
send_packet(self, test_port, pkt)
verify_each_packet_on_multiple_port_lists(self, flood_pkt_list,
flood_port_list)
print("\tPacket flooded to ports 1,2,3. ok")
p1_post_q_cnt = self.getPortQueueIndexStats(
self.port1, test['p1_queue'], ['SAI_QUEUE_STAT_PACKETS'])
p2_post_q_cnt = self.getPortQueueIndexStats(
self.port2, test['p2_queue'], ['SAI_QUEUE_STAT_PACKETS'])
p3_post_q_cnt = self.getPortQueueIndexStats(
self.port3, test['p3_queue'], ['SAI_QUEUE_STAT_PACKETS'])
self.assertEqual(
p1_initial_q_cnt + 1, p1_post_q_cnt,
'PORT1 queue {} packets counter {} != {}'.format(
test['p1_queue'], p1_initial_q_cnt + 1, p1_post_q_cnt))
self.assertEqual(
p2_initial_q_cnt + 1, p2_post_q_cnt,
'PORT2 queue {} packets counter {} != {}'.format(
test['p2_queue'], p2_initial_q_cnt + 1, p2_post_q_cnt))
self.assertEqual(
p3_initial_q_cnt + 1, p3_post_q_cnt,
'PORT3 queue {} packets counter {} != {}'.format(
test['p3_queue'], p3_initial_q_cnt + 1, p3_post_q_cnt))
finally:
for port in [self.port0, self.port1, self.port2, self.port3]:
sai_thrift_set_port_attribute(self.client,
port,
qos_tc_to_queue_map=0)
sai_thrift_set_port_attribute(self.client,
port,
qos_default_tc=0)
sai_thrift_set_port_attribute(self.client,
port,
qos_dot1p_to_tc_map=0)
sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
sai_thrift_remove_qos_map(self.client, qos_dot1p_to_tc_map_id)