in ptf/saiqosmap.py [0:0]
def l3IPv6QosSameDscpToTcMappingManyPortsTest(self):
'''
L3 IPv6 same DSCP to TC mapping for various ingress port test.
'''
print("l3IPv6QosSameDscpToTcMappingManyPortsTest")
test_cases = []
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 0,
'tc': 0,
'queue': 0
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 1,
'tc': 1,
'queue': 1
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 2,
'tc': 2,
'queue': 2
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 3,
'tc': 3,
'queue': 3
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 4,
'tc': 4,
'queue': 4
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 5,
'tc': 5,
'queue': 5
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 6,
'tc': 6,
'queue': 6
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 7,
'tc': 7,
'queue': 7
})
test_cases.append({
'tx_port': self.dev_port0,
'dscp': 10,
'tc': 0,
'queue': 0
}) # default
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 0,
'tc': 0,
'queue': 0
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 1,
'tc': 1,
'queue': 1
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 2,
'tc': 2,
'queue': 2
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 3,
'tc': 3,
'queue': 3
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 4,
'tc': 4,
'queue': 4
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 5,
'tc': 5,
'queue': 5
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 6,
'tc': 6,
'queue': 6
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 7,
'tc': 7,
'queue': 7
})
test_cases.append({
'tx_port': self.dev_port1,
'dscp': 10,
'tc': 0,
'queue': 0
}) # default
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 0,
'tc': 0,
'queue': 0
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 1,
'tc': 1,
'queue': 1
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 2,
'tc': 2,
'queue': 2
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 3,
'tc': 3,
'queue': 3
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 4,
'tc': 4,
'queue': 4
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 5,
'tc': 5,
'queue': 5
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 6,
'tc': 6,
'queue': 6
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 7,
'tc': 7,
'queue': 7
})
test_cases.append({
'tx_port': self.dev_port3,
'dscp': 10,
'tc': 0,
'queue': 0
}) # default
try:
# setup dscp to tc mapping
ingress_dscp_list = [0, 1, 2, 3, 4, 5, 6, 7]
ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
qos_dscp_to_tc_map_id = create_and_verify_qos_map(
self.client, SAI_QOS_MAP_TYPE_DSCP_TO_TC, ingress_dscp_list,
ingress_tc_list)
self.assertTrue(qos_dscp_to_tc_map_id != 0,
"Failed to create qos_map")
# setup tc to queue index mapping
ingress_tc_list = [0, 1, 2, 3, 4, 5, 6, 7]
ingress_queue_list = [0, 1, 2, 3, 4, 5, 6, 7]
qos_tc_to_queue_map_id = create_and_verify_qos_map(
self.client, SAI_QOS_MAP_TYPE_TC_TO_QUEUE, ingress_tc_list,
ingress_queue_list)
self.assertTrue(qos_tc_to_queue_map_id != 0,
"Failed to create qos_map")
ingress_test_ports = [self.port0, self.port1, self.port3]
egress_test_ports = [self.port2]
for port in ingress_test_ports:
status = sai_thrift_set_port_attribute(
self.client,
port,
qos_dscp_to_tc_map=qos_dscp_to_tc_map_id)
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_dscp_to_tc_map=True)
self.assertEqual(attr['qos_dscp_to_tc_map'],
qos_dscp_to_tc_map_id)
for port in egress_test_ports:
status = sai_thrift_set_port_attribute(
self.client,
port,
qos_tc_to_queue_map=qos_tc_to_queue_map_id)
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
attr = sai_thrift_get_port_attribute(self.client,
port,
qos_tc_to_queue_map=True)
self.assertEqual(attr['qos_tc_to_queue_map'],
qos_tc_to_queue_map_id)
for test in test_cases:
ipv6_tc = test['dscp'] << 2
pkt = simple_tcpv6_packet(
eth_dst=ROUTER_MAC,
eth_src='00:22:22:22:22:22',
ipv6_dst='1234:5678:9abc:def0:4422:1133:5577:1111',
ipv6_src='2000::1',
ipv6_hlim=64,
ipv6_tc=ipv6_tc,
pktlen=100)
exp_pkt = simple_tcpv6_packet(
eth_dst='00:11:22:33:44:06',
eth_src=ROUTER_MAC,
ipv6_dst='1234:5678:9abc:def0:4422:1133:5577:1111',
ipv6_src='2000::1',
ipv6_hlim=63,
ipv6_tc=ipv6_tc,
pktlen=100)
initial_q_cnt = self.getPortQueueIndexStats(
self.port2, test['queue'], ['SAI_QUEUE_STAT_PACKETS'])
print("Sending packet port %d (dscp=%d) ->"
" port %d (tc=%d queue=%d)"
% (test['tx_port'], test['dscp'], self.dev_port2,
test['tc'], test['queue']))
send_packet(self, test['tx_port'], pkt)
verify_packet(self, exp_pkt, self.dev_port2)
post_q_cnt = self.getPortQueueIndexStats(
self.port2, test['queue'], ['SAI_QUEUE_STAT_PACKETS'])
self.assertEqual(
initial_q_cnt + 1, post_q_cnt,
'queue {} packets counter {} != {}'.format(
test['queue'], initial_q_cnt + 1, post_q_cnt))
finally:
for port in self.test_ports:
status = sai_thrift_set_port_attribute(
self.client, port, qos_dscp_to_tc_map=int(
SAI_NULL_OBJECT_ID))
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
status = sai_thrift_set_port_attribute(
self.client, port, qos_tc_to_queue_map=int(
SAI_NULL_OBJECT_ID))
self.assertEqual(status, SAI_STATUS_SUCCESS,
"Failed to set port attribute")
sai_thrift_remove_qos_map(self.client, qos_tc_to_queue_map_id)
sai_thrift_remove_qos_map(self.client, qos_dscp_to_tc_map_id)