in ptf/saihash.py [0:0]
def lagIPv6PacketTest(self, hash_dict):
"""
Function that performs the IPv6 Lag test with L3 hashed traffic
Args:
hash_dict (dict): dictionary with variables that defines the list
of hash test fields and traffic header hashed fields
Returns:
list: list of numbers of packet egressed on specific ecmp group
member
"""
ecmp_count = [0, 0, 0]
src_mac_start = '00:22:22:22:{0}:{1}'
src_mac = '00:22:22:22:22:22'
dst_mac = ROUTER_MAC
udp_sport = 7
udp_dport = 7
dst_ip = socket.inet_pton(socket.AF_INET6, '8000:1:1:0:0:0:0:1')
dst_ip_arr = list(dst_ip)
src_ip = socket.inet_pton(socket.AF_INET6, '6000:1:1:0:0:0:0:100')
src_ip_arr = list(src_ip)
ipv6_flow_label = 0
if ('hash_flow_label' in hash_dict) and (hash_dict['hash_flow_label']):
ipv6_flow_label = 10
for i in range(0, MAX_ITRS):
if ('hash_src_mac' in hash_dict) and (hash_dict['hash_src_mac']):
src_mac = src_mac_start.format(
str(0xFF & (i * 117)).zfill(4)[:2],
str(0xFF & (i * 13)).zfill(4)[2:])
if ('hash_dst_ip' in hash_dict) and (hash_dict['hash_dst_ip']):
dst_ip_arr[15] = 0xFF & (dst_ip_arr[15] + 17)
dst_ip = binascii.unhexlify(''.join(
'%02x' % x for x in dst_ip_arr))
if ('hash_src_ip' in hash_dict) and (hash_dict['hash_src_ip']):
src_ip_arr[15] = 0xFF & (src_ip_arr[15] + 13)
src_ip = binascii.unhexlify(''.join(
'%02x' % x for x in src_ip_arr))
dst_ip_addr = socket.inet_ntop(socket.AF_INET6, dst_ip)
src_ip_addr = socket.inet_ntop(socket.AF_INET6, src_ip)
if ('hash_udp_sport' in hash_dict) and \
(hash_dict['hash_udp_sport']):
udp_sport += 13
if ('hash_udp_dport' in hash_dict) and \
(hash_dict['hash_udp_dport']):
udp_dport += 13
pkt = simple_udpv6_packet(eth_dst=ROUTER_MAC,
eth_src=src_mac,
ipv6_dst=dst_ip_addr,
ipv6_src=src_ip_addr,
udp_sport=udp_sport,
udp_dport=udp_dport,
ipv6_hlim=64,
ipv6_fl=ipv6_flow_label)
exp_pkt = simple_udpv6_packet(eth_dst='00:88:88:88:88:88',
eth_src=ROUTER_MAC,
ipv6_dst=dst_ip_addr,
ipv6_src=src_ip_addr,
udp_sport=udp_sport,
udp_dport=udp_dport,
ipv6_hlim=63,
ipv6_fl=ipv6_flow_label)
send_packet(self, self.dev_port4, pkt)
ports_to_verify = [
self.dev_port4,
self.dev_port5,
self.dev_port6]
rcv_idx = verify_any_packet_any_port(
self, [exp_pkt], ports_to_verify)
if DEBUG:
print("idx:", rcv_idx, "dst_ip:", dst_ip_addr, " src_ip:",
src_ip_addr, " smac:", src_mac, " dmac: ", dst_mac,
"ipv6_flow_label: ", ipv6_flow_label)
ecmp_count[rcv_idx] += 1
return ecmp_count