in ptf/saihash.py [0:0]
def l3IPv4LagPacketTest(self, hash_dict, traffic=True, max_itrs=MAX_ITRS):
"""
Function that performs the IPv4 LAG hash test with L3 hashed traffic
Args:
hash_dict (dict): dictionary with variables that defines the list
of hash test fields and traffic header hashed fields
traffic (boolean): informs if traffic is expected on egress ports
max_itrs (int): maximum number of iterations
Returns:
list: list of numbers of packet egressed on specific test member
"""
count = [0, 0, 0]
src_mac_start = '00:22:22:22:{0}:{1}'
dst_mac_start = '00:99:99:99:{0}:{1}'
dst_mac = '00:99:99:99:99:99'
src_mac = '00:22:22:22:22:22'
dst_ip = int(binascii.hexlify(socket.inet_aton('10.70.70.1')), 16)
src_ip = int(binascii.hexlify(socket.inet_aton('192.168.8.1')), 16)
udp_sport = 7
udp_dport = 7
for i in range(0, max_itrs):
dst_ip_addr = socket.inet_ntoa(
binascii.unhexlify(hex(dst_ip)[2:].zfill(8)))
src_ip_addr = socket.inet_ntoa(
binascii.unhexlify(hex(src_ip)[2:].zfill(8)))
if ('hash_dst_ip' in hash_dict) and (hash_dict['hash_dst_ip']):
dst_ip += i * 7
if ('hash_src_ip' in hash_dict) and (hash_dict['hash_src_ip']):
src_ip += i * 17
if ('hash_src_mac' in hash_dict) and (hash_dict['hash_src_mac']):
src_mac = src_mac_start.format(
str(i).zfill(4)[:2],
str(i).zfill(4)[2:])
if ('hash_dst_mac' in hash_dict) and (hash_dict['hash_dst_mac']):
dst_mac = dst_mac_start.format(
str(i).zfill(4)[:2],
str(i).zfill(4)[2:])
if ('hash_udp_sport' in hash_dict) and \
(hash_dict['hash_udp_sport']):
udp_sport += 13
if ('hash_udp_dport' in hash_dict) and \
(hash_dict['hash_udp_dport']):
udp_dport += 13
pkt = simple_udp_packet(eth_dst=ROUTER_MAC,
eth_src=src_mac,
ip_dst=dst_ip_addr,
ip_src=src_ip_addr,
udp_sport=udp_sport,
udp_dport=udp_dport,
ip_id=106,
ip_ttl=64)
exp_pkt = simple_udp_packet(eth_dst='00:77:77:77:77:77',
eth_src=ROUTER_MAC,
ip_dst=dst_ip_addr,
ip_src=src_ip_addr,
udp_sport=udp_sport,
udp_dport=udp_dport,
ip_id=106,
ip_ttl=63)
send_packet(self, self.dev_port4, pkt)
if traffic:
exp_ports = [self.dev_port4, self.dev_port5, self.dev_port6]
rcv_idx = verify_any_packet_any_port(
self, [exp_pkt], exp_ports)
count[rcv_idx] += 1
if DEBUG:
print("idx:", rcv_idx, "dst_ip:", dst_ip_addr, " src_ip:",
src_ip_addr, " smac:", src_mac, " dmac: ", dst_mac,
" dport", udp_dport, " sport", udp_sport)
else:
verify_no_other_packets(self)
return count