in ptf/sainat.py [0:0]
def srcNatAclTranslationDisableTest(self):
'''
Verifies if translation doesn't occur when source NAT entry exists
but ACL is configured to disable NAT translation.
Test is performed for different RIFs (regular L3 port, L3 LAG and SVI)
and different nexhtops (regular L3 port, L3 LAG and SVI)
with two kinds of packets - TCP and UDP.
'''
print("\nsrcNatAclTranslationDisableTest()")
src_ip = "20.20.20.1"
nat_src_ip = "150.10.10.10"
def verify_translation(src_rif, src_port_dev):
'''
Additional helper function for translation verification.
Verifies if translation doesn't occur when ACL is configured
to disable it.
Args:
src_rif (oid): object ID of source RIF
src_port_dev (int): source device port number
'''
acl_counter = sai_thrift_get_acl_counter_attribute(
self.client, self.ingr_acl_counter, packets=True)['packets']
# use route to L3 port
print(" -> Egress L3 port")
tcp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=64,
pktlen=100,
with_tcp_chksum=True)
nat_tcp_pkt = simple_tcp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=nat_src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
no_nat_tcp_pkt = simple_tcp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
udp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=64,
pktlen=100)
nat_udp_pkt = simple_udp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=nat_src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100)
no_nat_udp_pkt = simple_udp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100)
print("Sending TCP packet with NAT enabled")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet(self, nat_tcp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Sending UDP packet with NAT enabled")
send_packet(self, src_port_dev, udp_pkt)
verify_packet(self, nat_udp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=self.ingr_acl_table)
print("Sending TCP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=0)
# use route to L3 LAG
print(" -> Egress L3 LAG")
tcp_pkt[IP].dst = self.lag_nbor_ip
nat_tcp_pkt[Ether].dst = self.lag_nbor_mac
nat_tcp_pkt[IP].dst = self.lag_nbor_ip
no_nat_tcp_pkt[Ether].dst = self.lag_nbor_mac
no_nat_tcp_pkt[IP].dst = self.lag_nbor_ip
udp_pkt[IP].dst = self.lag_nbor_ip
nat_udp_pkt[Ether].dst = self.lag_nbor_mac
nat_udp_pkt[IP].dst = self.lag_nbor_ip
no_nat_udp_pkt[Ether].dst = self.lag_nbor_mac
no_nat_udp_pkt[IP].dst = self.lag_nbor_ip
print("Sending TCP packet with NAT enabled")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet_any_port(self, nat_tcp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Sending UDP packet with NAT enabled")
send_packet(self, src_port_dev, udp_pkt)
verify_packet_any_port(self, nat_udp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=self.ingr_acl_table)
print("Sending TCP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet_any_port(self, no_nat_tcp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, udp_pkt)
verify_packet_any_port(self, no_nat_udp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=0)
# use route to SVI
print(" -> Egress SVI")
tcp_pkt[IP].dst = self.svi_nbor_ip
nat_tcp_pkt[Ether].dst = self.svi_nbor_mac
nat_tcp_pkt[IP].dst = self.svi_nbor_ip
no_nat_tcp_pkt[Ether].dst = self.svi_nbor_mac
no_nat_tcp_pkt[IP].dst = self.svi_nbor_ip
udp_pkt[IP].dst = self.svi_nbor_ip
nat_udp_pkt[Ether].dst = self.svi_nbor_mac
nat_udp_pkt[IP].dst = self.svi_nbor_ip
no_nat_udp_pkt[Ether].dst = self.svi_nbor_mac
no_nat_udp_pkt[IP].dst = self.svi_nbor_ip
print("Sending TCP packet with NAT enabled")
send_packet(self, src_port_dev, tcp_pkt)
verify_packets(self, nat_tcp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Sending UDP packet with NAT enabled")
send_packet(self, src_port_dev, udp_pkt)
verify_packets(self, nat_udp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=self.ingr_acl_table)
print("Sending TCP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, tcp_pkt)
verify_packets(self, no_nat_tcp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, udp_pkt)
verify_packets(self, no_nat_udp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=0)
try:
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=src_ip),
mask=sai_thrift_nat_entry_mask_t(
src_ip='255.255.255.255'))
snat = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
sai_thrift_create_nat_entry(self.client,
snat,
src_ip=nat_src_ip,
nat_type=SAI_NAT_TYPE_SOURCE_NAT,
enable_packet_count=True)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=1)
print("\n***Ingress L3 port***")
verify_translation(self.ingr_port_rif, self.ingr_port_dev)
print("\n***Ingress L3 LAG***")
for lag_port in self.ingr_lag_dev:
verify_translation(self.ingr_lag_rif, lag_port)
print("\n***Ingress SVI***")
for svi_port in self.ingr_svi_dev:
verify_translation(self.ingr_svi_rif, svi_port)
finally:
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=0)
sai_thrift_remove_nat_entry(self.client, snat)