in ptf/sainat.py [0:0]
def dstNatAclTranslationDisableTest(self):
'''
Verifies if translation doesn't occur when destination NAT entries
exist but ACL is configured to disable NAT translation.
Test is performed for different RIFs (regular L3 port, L3 LAG and SVI)
and different nexhtops (regular L3 port, L3 LAG and SVI)
with two kinds of packets - TCP and UDP.
'''
print("\ndstNatAclTranslationDisableTest()")
def verify_translation(dst_rif):
'''
Additional helper function for translation verification.
Verifies if translation doesn't occur when ACL is configured
to disable it.
Args:
dst_rif (oid): object ID of destination RIF
'''
acl_counter = sai_thrift_get_acl_counter_attribute(
self.client, self.egr_acl_counter, packets=True)['packets']
src_ip = "20.20.20.1"
verify_fn = verify_packet
dst_port_dev = self.egr_port_dev
dst_ip = self.nat_ip_to_port
dst_mac = self.port_nbor_mac
nat_dst_ip = self.port_nbor_ip
dnat = port_dnat
if dst_rif == self.egr_lag_rif:
verify_fn = verify_packet_any_port
dst_port_dev = self.egr_lag_dev
dst_ip = self.nat_ip_to_lag
dst_mac = self.lag_nbor_mac
nat_dst_ip = self.lag_nbor_ip
dnat = lag_dnat
elif dst_rif == self.egr_svi_rif:
verify_fn = verify_packets
dst_port_dev = self.egr_svi_dev
dst_ip = self.nat_ip_to_svi
dst_mac = self.svi_nbor_mac
nat_dst_ip = self.svi_nbor_ip
dnat = svi_dnat
tcp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=64,
pktlen=100,
with_tcp_chksum=True)
nat_tcp_pkt = simple_tcp_packet(eth_dst=dst_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=nat_dst_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
no_nat_tcp_pkt = simple_tcp_packet(eth_dst=self.no_nat_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
udp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=64,
pktlen=100)
nat_udp_pkt = simple_udp_packet(eth_dst=dst_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=nat_dst_ip,
ip_ttl=63,
pktlen=100)
no_nat_udp_pkt = simple_udp_packet(eth_dst=self.no_nat_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=63,
pktlen=100)
print(" Inress L3 port")
print("Sending TCP packet with NAT enabled on L3 Port")
send_packet(self, self.ingr_port_dev, tcp_pkt)
verify_fn(self, nat_tcp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Sending UDP packet with NAT enabled on L3 Port")
send_packet(self, self.ingr_port_dev, udp_pkt)
verify_fn(self, nat_udp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client,
self.ingr_port_rif,
ingress_acl=self.egr_acl_table)
print("Sending TCP packet with NAT disabled by ACL on L3 Port")
send_packet(self, self.ingr_port_dev, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL on L3 Port")
send_packet(self, self.ingr_port_dev, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, ingress_acl=0)
print(" Inress L3 LAG")
for src_port in self.ingr_lag_dev:
print("Sending TCP packet with NAT enabled on L3 LAG")
send_packet(self, src_port, tcp_pkt)
verify_fn(self, nat_tcp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Sending UDP packet with NAT enabled on L3 LAG")
send_packet(self, src_port, udp_pkt)
verify_fn(self, nat_udp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client,
self.ingr_lag_rif,
ingress_acl=self.egr_acl_table)
print("Sending TCP packet with NAT disabled by ACL on L3 LAG")
send_packet(self, src_port, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL on L3 LAG")
send_packet(self, src_port, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, ingress_acl=0)
print(" Inress SVI")
for src_port in self.ingr_svi_dev:
print("Sending TCP packet with NAT enabled on SVI")
send_packet(self, src_port, tcp_pkt)
verify_fn(self, nat_tcp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Sending UDP packet with NAT enabled on SVI")
send_packet(self, src_port, udp_pkt)
verify_fn(self, nat_udp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client,
self.ingr_svi_rif,
ingress_acl=self.egr_acl_table)
print("Sending TCP packet with NAT disabled by ACL on SVI")
send_packet(self, src_port, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL on SVI")
send_packet(self, src_port, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, ingress_acl=0)
try:
# NAT configuration
port_nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.nat_ip_to_port),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255'))
port_dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=port_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
sai_thrift_create_nat_entry(
self.client, port_dnat, dst_ip=self.port_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT,
enable_packet_count=True)
port_dnat_pool = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=port_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_create_nat_entry(
self.client, port_dnat_pool, dst_ip=self.port_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
lag_nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.nat_ip_to_lag),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255'))
lag_dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=lag_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
sai_thrift_create_nat_entry(
self.client, lag_dnat, dst_ip=self.lag_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT,
enable_packet_count=True)
lag_dnat_pool = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=lag_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_create_nat_entry(
self.client, lag_dnat_pool, dst_ip=self.lag_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
svi_nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.nat_ip_to_svi),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255'))
svi_dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=svi_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
sai_thrift_create_nat_entry(
self.client, svi_dnat, dst_ip=self.svi_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT,
enable_packet_count=True)
svi_dnat_pool = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=svi_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_create_nat_entry(
self.client, svi_dnat_pool, dst_ip=self.svi_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=0)
print("\n***Egress L3 port <-***")
verify_translation(self.egr_port_rif)
print("\n***Egress L3 LAG <-***")
verify_translation(self.egr_lag_rif)
print("\n***Egress SVI <-***")
verify_translation(self.egr_svi_rif)
finally:
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=0)
sai_thrift_remove_nat_entry(self.client, svi_dnat_pool)
sai_thrift_remove_nat_entry(self.client, svi_dnat)
sai_thrift_remove_nat_entry(self.client, lag_dnat_pool)
sai_thrift_remove_nat_entry(self.client, lag_dnat)
sai_thrift_remove_nat_entry(self.client, port_dnat_pool)
sai_thrift_remove_nat_entry(self.client, port_dnat)