in ptf/saiqosmap.py [0:0]
def l2QosMapSamePcpToTcManyIngressPortsTest(self):
''' Test verifies the following:
1. L2 traffic same PCP to many TC mapping
The verification method:
- setting the TC to QUEUE mapping
- verification of the queue stats
'''
print("l2QosMapSamePcpToTcManyIngressPortsTest")
test_cases = []
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 0,
'p1_queue': 0,
'p2_queue': 0,
'p3_queue': 0
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 1,
'p1_queue': 1,
'p2_queue': 1,
'p3_queue': 1
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 2,
'p1_queue': 2,
'p2_queue': 2,
'p3_queue': 2
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 3,
'p1_queue': 3,
'p2_queue': 3,
'p3_queue': 3
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 4,
'p1_queue': 4,
'p2_queue': 4,
'p3_queue': 4
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 5,
'p1_queue': 5,
'p2_queue': 5,
'p3_queue': 5
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 6,
'p1_queue': 6,
'p2_queue': 6,
'p3_queue': 6
})
test_cases.append({
'tx_port': self.dev_port0,
'pcp': 7,
'p1_queue': 7,
'p2_queue': 7,
'p3_queue': 7
})
try:
print("Create dot1p -> tc qos mapping")
ingress_dot1p_list = [0, 1, 2, 3, 4, 5, 6, 7]
ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
qos_dot1p_to_tc_map_id = create_and_verify_qos_map(
self.client, SAI_QOS_MAP_TYPE_DOT1P_TO_TC, ingress_dot1p_list,
ingress_tc_list)
self.assertTrue(qos_dot1p_to_tc_map_id != 0,
"Failed to create qos_map")
print("\tok")
print("Create tc -> queue index qos mapping")
ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
ingress_queue_list = [0, 1, 2, 3, 4, 5, 6, 7]
qos_tc_to_queue_map_id = create_and_verify_qos_map(
self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
ingress_queue_list)
self.assertTrue(qos_tc_to_queue_map_id != 0,
"Failed to create qos_map")
print("\tok")
# configure the ingress ports
for port, dot1p_to_tc in [[self.port0, qos_dot1p_to_tc_map_id],
[self.port1, qos_dot1p_to_tc_map_id],
[self.port2, qos_dot1p_to_tc_map_id],
[self.port3, qos_dot1p_to_tc_map_id]]:
status = sai_thrift_set_port_attribute(
self.client, port, qos_dot1p_to_tc_map=dot1p_to_tc)
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_dot1p_to_tc_map=True)
self.assertEqual(attr['qos_dot1p_to_tc_map'], dot1p_to_tc)
# configure the egress ports
for port, tc_to_queue in [[self.port0, qos_tc_to_queue_map_id],
[self.port1, qos_tc_to_queue_map_id],
[self.port2, qos_tc_to_queue_map_id],
[self.port3, qos_tc_to_queue_map_id]]:
status = sai_thrift_set_port_attribute(
self.client, port, qos_tc_to_queue_map=tc_to_queue)
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_tc_to_queue_map=True)
self.assertEqual(attr['qos_tc_to_queue_map'], tc_to_queue)
# verify port default_tc
ports = [self.port0, self.port1, self.port2, self.port3]
self.verifyPortDefaultDscpToTc(ports)
for test_port in self.dev_test_port_list:
print("Testing tx_port=%d" % (test_port))
for test in test_cases:
pcp = test['pcp']
pkt = simple_eth_raw_packet_with_taglist(
pktlen=104,
eth_dst='00:33:33:33:33:00',
eth_src='00:33:33:33:33:11',
dl_taglist_enable=True,
dl_vlan_pcp_list=[pcp],
dl_vlanid_list=[1])
flood_port_list = []
flood_pkt_list = []
for port in self.dev_test_port_list:
if port == test_port:
continue
flood_port_list.append([port])
flood_pkt_list.append(pkt)
init_q_cnt = []
for port in [
self.port0, self.port1, self.port2, self.port3
]:
init_q_cnt.append(
self.getPortQueueIndexStats(
port, test['p1_queue'],
['SAI_QUEUE_STAT_PACKETS']))
print("Sending packet port %d (pcp=%d) -> "
"port %d (queue=%d)"
% (test_port, pcp, flood_port_list[0][0],
test['p1_queue']))
print("\t\t\t\t\tport %d (queue=%d)"
% (flood_port_list[1][0], test['p2_queue']))
print("\t\t\t\t\tport %d (queue=%d)"
% (flood_port_list[2][0], test['p3_queue']))
send_packet(self, test_port, pkt)
verify_each_packet_on_multiple_port_lists(
self, flood_pkt_list, flood_port_list)
print("\tPacket flooded to ports %s. ok" %
(flood_port_list))
print("\tVerify port queues packet counters.")
post_q_cnt = []
for port in [
self.port0, self.port1, self.port2, self.port3
]:
post_q_cnt.append(
self.getPortQueueIndexStats(
port, test['p1_queue'],
['SAI_QUEUE_STAT_PACKETS']))
# verify the port queue counters
print("Test")
print(type(test))
self.verifyPortQueueCountersVariousPcp(
test_port, test, init_q_cnt, post_q_cnt)
print("\tok")
finally:
for port in [self.port0, self.port1, self.port2, self.port3]:
sai_thrift_set_port_attribute(self.client,
port,
qos_tc_to_queue_map=0)
sai_thrift_set_port_attribute(self.client,
port,
qos_default_tc=0)
sai_thrift_set_port_attribute(self.client,
port,
qos_dot1p_to_tc_map=0)
sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
sai_thrift_remove_qos_map(self.client, qos_dot1p_to_tc_map_id)