ptf/sainat.py (1,240 lines of code) (raw):
# Copyright 2021-present Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Thrift SAI interface NAT tests
'''
from sai_thrift.sai_headers import *
from sai_base_test import *
@group("draft")
class NatTranslationTest(SaiHelper):
'''
NAT translation tests including ACL rules coexistence.
'''
def setUp(self):
super(NatTranslationTest, self).setUp()
# object lists
self.route_list = []
self.nbor_list = []
self.nhop_list = []
self.acl_entry_list = []
self.acl_cntr_list = []
self.acl_table_list = []
self.rif_list = []
self.vlan_mbr_list = []
self.vlan_list = []
self.bp_list = []
# regular L3 ports configuration
self.ingr_port_rif = self.port10_rif
self.egr_port_rif = self.port11_rif
self.ingr_port = self.port10
self.egr_port = self.port11
self.ingr_port_dev = self.dev_port10
self.egr_port_dev = self.dev_port11
self.port_nbor_ip = "10.10.10.1"
self.port_nbor_mac = "00:11:11:11:11:11"
self.nat_ip_to_port = "30.30.30.10"
self.nat_port_nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress(self.port_nbor_ip),
router_interface_id=self.egr_port_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.nhop_list.append(self.nat_port_nhop)
self.nat_port_nbor = sai_thrift_neighbor_entry_t(
rif_id=self.egr_port_rif,
ip_address=sai_ipaddress(self.port_nbor_ip))
sai_thrift_create_neighbor_entry(self.client,
self.nat_port_nbor,
dst_mac_address=self.port_nbor_mac,
no_host_route=True)
self.nbor_list.append(self.nat_port_nbor)
self.nat_port_route = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(self.port_nbor_ip + '/32'))
sai_thrift_create_route_entry(
self.client, self.nat_port_route, next_hop_id=self.nat_port_nhop)
self.route_list.append(self.nat_port_route)
# L3 LAGs configuration
self.ingr_lag_rif = self.lag3_rif
self.egr_lag_rif = self.lag4_rif
self.ingr_lag = [self.port14, self.port15, self.port16]
self.egr_lag = [self.port17, self.port18, self.port19]
self.ingr_lag_dev = [self.dev_port14, self.dev_port15, self.dev_port16]
self.egr_lag_dev = [self.dev_port17, self.dev_port18, self.dev_port19]
self.lag_nbor_ip = "10.10.20.1"
self.lag_nbor_mac = "00:22:22:22:22:22"
self.nat_ip_to_lag = "30.30.30.20"
self.nat_lag_nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress(self.lag_nbor_ip),
router_interface_id=self.egr_lag_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.nhop_list.append(self.nat_lag_nhop)
self.nat_lag_nbor = sai_thrift_neighbor_entry_t(
rif_id=self.egr_lag_rif,
ip_address=sai_ipaddress(self.lag_nbor_ip))
sai_thrift_create_neighbor_entry(self.client,
self.nat_lag_nbor,
dst_mac_address=self.lag_nbor_mac,
no_host_route=True)
self.nbor_list.append(self.nat_lag_nbor)
self.nat_lag_route = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(self.lag_nbor_ip + '/32'))
sai_thrift_create_route_entry(
self.client, self.nat_lag_route, next_hop_id=self.nat_lag_nhop)
self.route_list.append(self.nat_lag_route)
# SVI configuration
self.port24_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port24,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.bp_list.append(self.port24_bp)
self.port25_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port25,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.bp_list.append(self.port25_bp)
self.vlan100 = sai_thrift_create_vlan(self.client, vlan_id=100)
self.vlan_list.append(self.vlan100)
self.vlan100_member0 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.vlan100,
bridge_port_id=self.port24_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED)
self.vlan_mbr_list.append(self.vlan100_member0)
self.vlan100_member1 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.vlan100,
bridge_port_id=self.port25_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED)
self.vlan_mbr_list.append(self.vlan100_member1)
sai_thrift_set_port_attribute(
self.client, self.port24, port_vlan_id=100)
sai_thrift_set_port_attribute(
self.client, self.port25, port_vlan_id=100)
self.port26_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port26,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.bp_list.append(self.port26_bp)
self.port27_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port27,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.bp_list.append(self.port27_bp)
self.vlan200 = sai_thrift_create_vlan(self.client, vlan_id=200)
self.vlan_list.append(self.vlan200)
self.vlan200_member0 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.vlan200,
bridge_port_id=self.port26_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED)
self.vlan_mbr_list.append(self.vlan200_member0)
self.vlan200_member1 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.vlan200,
bridge_port_id=self.port27_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED)
self.vlan_mbr_list.append(self.vlan200_member1)
sai_thrift_set_port_attribute(
self.client, self.port26, port_vlan_id=200)
sai_thrift_set_port_attribute(
self.client, self.port27, port_vlan_id=200)
self.ingr_svi_rif = sai_thrift_create_router_interface(
self.client,
type=SAI_ROUTER_INTERFACE_TYPE_VLAN,
virtual_router_id=self.default_vrf,
vlan_id=self.vlan100)
self.rif_list.append(self.ingr_svi_rif)
self.egr_svi_rif = sai_thrift_create_router_interface(
self.client,
type=SAI_ROUTER_INTERFACE_TYPE_VLAN,
virtual_router_id=self.default_vrf,
vlan_id=self.vlan200)
self.rif_list.append(self.egr_svi_rif)
self.ingr_svi = [self.port24, self.port25]
self.egr_svi = [self.port26, self.port27]
self.ingr_svi_dev = [self.dev_port24, self.dev_port25]
self.egr_svi_dev = [self.dev_port26, self.dev_port27]
self.svi_nbor_ip = "10.10.30.1"
self.svi_nbor_mac = "00:33:33:33:33:33"
self.nat_ip_to_svi = "30.30.30.30"
self.nat_svi_nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress(self.svi_nbor_ip),
router_interface_id=self.egr_svi_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.nhop_list.append(self.nat_svi_nhop)
self.nat_svi_nbor = sai_thrift_neighbor_entry_t(
rif_id=self.egr_svi_rif,
ip_address=sai_ipaddress(self.svi_nbor_ip))
sai_thrift_create_neighbor_entry(self.client,
self.nat_svi_nbor,
dst_mac_address=self.svi_nbor_mac,
no_host_route=True)
self.nbor_list.append(self.nat_svi_nbor)
self.nat_svi_route = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(self.svi_nbor_ip + '/32'))
sai_thrift_create_route_entry(
self.client, self.nat_svi_route, next_hop_id=self.nat_svi_nhop)
self.route_list.append(self.nat_svi_route)
# no-NAT ACL configuration
action_types = [SAI_ACL_ACTION_TYPE_NO_NAT]
action_types_list = sai_thrift_s32_list_t(count=len(action_types),
int32list=action_types)
acl_action = sai_thrift_acl_action_data_t(
enable=True,
parameter=sai_thrift_acl_action_parameter_t(booldata=True))
bind_points = [SAI_ACL_BIND_POINT_TYPE_ROUTER_INTERFACE]
bind_points_list = sai_thrift_s32_list_t(count=len(bind_points),
int32list=bind_points)
self.ingr_acl_table = sai_thrift_create_acl_table(
self.client,
acl_stage=SAI_ACL_STAGE_INGRESS,
acl_bind_point_type_list=bind_points_list,
acl_action_type_list=action_types_list,
field_dst_ip=True)
self.acl_table_list.append(self.ingr_acl_table)
self.ingr_acl_counter = sai_thrift_create_acl_counter(
self.client,
self.ingr_acl_table,
enable_packet_count=True)
self.acl_cntr_list.append(self.ingr_acl_counter)
ingr_acl_cnt_action = sai_thrift_acl_action_data_t(
enable=True,
parameter=sai_thrift_acl_action_parameter_t(
oid=self.ingr_acl_counter))
port_nbor_ip_addr = sai_thrift_acl_field_data_t(
data=sai_thrift_acl_field_data_data_t(ip4=self.port_nbor_ip),
mask=sai_thrift_acl_field_data_mask_t(ip4="255.255.255.255"))
self.ingr_acl_port_entry = sai_thrift_create_acl_entry(
self.client,
table_id=self.ingr_acl_table,
action_no_nat=acl_action,
action_counter=ingr_acl_cnt_action,
field_dst_ip=port_nbor_ip_addr)
self.acl_entry_list.append(self.ingr_acl_port_entry)
lag_nbor_ip_addr = sai_thrift_acl_field_data_t(
data=sai_thrift_acl_field_data_data_t(ip4=self.lag_nbor_ip),
mask=sai_thrift_acl_field_data_mask_t(ip4="255.255.255.255"))
self.ingr_acl_lag_entry = sai_thrift_create_acl_entry(
self.client,
table_id=self.ingr_acl_table,
action_no_nat=acl_action,
action_counter=ingr_acl_cnt_action,
field_dst_ip=lag_nbor_ip_addr)
self.acl_entry_list.append(self.ingr_acl_lag_entry)
svi_nbor_ip_addr = sai_thrift_acl_field_data_t(
data=sai_thrift_acl_field_data_data_t(ip4=self.svi_nbor_ip),
mask=sai_thrift_acl_field_data_mask_t(ip4="255.255.255.255"))
self.ingr_acl_svi_entry = sai_thrift_create_acl_entry(
self.client,
table_id=self.ingr_acl_table,
action_no_nat=acl_action,
action_counter=ingr_acl_cnt_action,
field_dst_ip=svi_nbor_ip_addr)
self.acl_entry_list.append(self.ingr_acl_svi_entry)
self.egr_acl_table = sai_thrift_create_acl_table(
self.client,
acl_stage=SAI_ACL_STAGE_INGRESS,
acl_bind_point_type_list=bind_points_list,
acl_action_type_list=action_types_list,
field_dst_ip=True)
self.acl_table_list.append(self.egr_acl_table)
self.egr_acl_counter = sai_thrift_create_acl_counter(
self.client,
self.egr_acl_table,
enable_packet_count=True)
self.acl_cntr_list.append(self.egr_acl_counter)
egr_acl_cnt_action = sai_thrift_acl_action_data_t(
enable=True,
parameter=sai_thrift_acl_action_parameter_t(
oid=self.egr_acl_counter))
to_port_ip_addr = sai_thrift_acl_field_data_t(
data=sai_thrift_acl_field_data_data_t(ip4=self.nat_ip_to_port),
mask=sai_thrift_acl_field_data_mask_t(ip4="255.255.255.255"))
self.egr_acl_port_entry = sai_thrift_create_acl_entry(
self.client,
table_id=self.egr_acl_table,
action_no_nat=acl_action,
action_counter=egr_acl_cnt_action,
field_dst_ip=to_port_ip_addr)
self.acl_entry_list.append(self.egr_acl_port_entry)
to_lag_ip_addr = sai_thrift_acl_field_data_t(
data=sai_thrift_acl_field_data_data_t(ip4=self.nat_ip_to_lag),
mask=sai_thrift_acl_field_data_mask_t(ip4="255.255.255.255"))
self.egr_acl_lag_entry = sai_thrift_create_acl_entry(
self.client,
table_id=self.egr_acl_table,
action_no_nat=acl_action,
action_counter=egr_acl_cnt_action,
field_dst_ip=to_lag_ip_addr)
self.acl_entry_list.append(self.egr_acl_lag_entry)
to_svi_ip_addr = sai_thrift_acl_field_data_t(
data=sai_thrift_acl_field_data_data_t(ip4=self.nat_ip_to_svi),
mask=sai_thrift_acl_field_data_mask_t(ip4="255.255.255.255"))
self.egr_acl_svi_entry = sai_thrift_create_acl_entry(
self.client,
table_id=self.egr_acl_table,
action_no_nat=acl_action,
action_counter=egr_acl_cnt_action,
field_dst_ip=to_svi_ip_addr)
self.acl_entry_list.append(self.egr_acl_svi_entry)
# no-NAT route configuration
no_nat_rif = self.port12_rif
self.no_nat_eport = self.dev_port12
no_nat_ip = "30.30.30.0"
self.no_nat_nbor_mac = "00:33:33:33:33:33"
self.no_nat_nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress(no_nat_ip),
router_interface_id=no_nat_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.nhop_list.append(self.no_nat_nhop)
self.no_nat_nbor = sai_thrift_neighbor_entry_t(
rif_id=no_nat_rif,
ip_address=sai_ipaddress(no_nat_ip))
sai_thrift_create_neighbor_entry(self.client,
self.no_nat_nbor,
dst_mac_address=self.no_nat_nbor_mac,
no_host_route=True)
self.nbor_list.append(self.no_nat_nbor)
self.no_nat_route = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(no_nat_ip + '/24'))
sai_thrift_create_route_entry(
self.client, self.no_nat_route, next_hop_id=self.no_nat_nhop)
self.route_list.append(self.no_nat_route)
def runTest(self):
self.srcNatAclTranslationDisableTest()
self.dstNatAclTranslationDisableTest()
def tearDown(self):
for route in self.route_list:
sai_thrift_remove_route_entry(self.client, route)
for nbor in self.nbor_list:
sai_thrift_remove_neighbor_entry(self.client, nbor)
for nhop in self.nhop_list:
sai_thrift_remove_next_hop(self.client, nhop)
for acl_entry in self.acl_entry_list:
sai_thrift_remove_acl_entry(self.client, acl_entry)
for acl_cntr in self.acl_cntr_list:
sai_thrift_remove_acl_counter(self.client, acl_cntr)
for acl_table in self.acl_table_list:
sai_thrift_remove_acl_table(self.client, acl_table)
for rif in self.rif_list:
sai_thrift_remove_router_interface(self.client, rif)
for port in [self.port24, self.port25, self.port26, self.port27]:
sai_thrift_set_port_attribute(self.client, port, port_vlan_id=0)
for vlan_mbr in self.vlan_mbr_list:
sai_thrift_remove_vlan_member(self.client, vlan_mbr)
for vlan in self.vlan_list:
sai_thrift_remove_vlan(self.client, vlan)
for bp in self.bp_list:
sai_thrift_remove_bridge_port(self.client, bp)
super(NatTranslationTest, self).tearDown()
def _verifyAclCounter(self, acl_counter, prev_value):
'''
Helper function for verifying if given ACL counter was incremented.
Args:
acl_counter (oid): object ID of ACL counter to be read
prev_value (int): przevious counter value
Return:
bool: True if counter was incremented; False otherwise
'''
counter = sai_thrift_get_acl_counter_attribute(
self.client, acl_counter, packets=True)
counter = counter['packets']
if counter == prev_value + 1:
print("ACL hit")
return True
return False
def _verifyNatHit(self, nat_entry):
'''
Helper function for verifying if given NAT entry was hit.
Args:
nat_entry (oid): object ID of NAT entry for which counter is
to be read
Return:
bool: True if NAT was hit; False otherwise
'''
counter = sai_thrift_get_nat_entry_attribute(
self.client, nat_entry, packet_count=True)
counter = counter['packet_count']
if counter != 1:
return False
print("NAT hit")
sai_thrift_set_nat_entry_attribute(
self.client, nat_entry, packet_count=0)
return True
def srcNatAclTranslationDisableTest(self):
'''
Verifies if translation doesn't occur when source NAT entry exists
but ACL is configured to disable NAT translation.
Test is performed for different RIFs (regular L3 port, L3 LAG and SVI)
and different nexhtops (regular L3 port, L3 LAG and SVI)
with two kinds of packets - TCP and UDP.
'''
print("\nsrcNatAclTranslationDisableTest()")
src_ip = "20.20.20.1"
nat_src_ip = "150.10.10.10"
def verify_translation(src_rif, src_port_dev):
'''
Additional helper function for translation verification.
Verifies if translation doesn't occur when ACL is configured
to disable it.
Args:
src_rif (oid): object ID of source RIF
src_port_dev (int): source device port number
'''
acl_counter = sai_thrift_get_acl_counter_attribute(
self.client, self.ingr_acl_counter, packets=True)['packets']
# use route to L3 port
print(" -> Egress L3 port")
tcp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=64,
pktlen=100,
with_tcp_chksum=True)
nat_tcp_pkt = simple_tcp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=nat_src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
no_nat_tcp_pkt = simple_tcp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
udp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=64,
pktlen=100)
nat_udp_pkt = simple_udp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=nat_src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100)
no_nat_udp_pkt = simple_udp_packet(eth_dst=self.port_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=self.port_nbor_ip,
ip_ttl=63,
pktlen=100)
print("Sending TCP packet with NAT enabled")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet(self, nat_tcp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Sending UDP packet with NAT enabled")
send_packet(self, src_port_dev, udp_pkt)
verify_packet(self, nat_udp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=self.ingr_acl_table)
print("Sending TCP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.egr_port_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=0)
# use route to L3 LAG
print(" -> Egress L3 LAG")
tcp_pkt[IP].dst = self.lag_nbor_ip
nat_tcp_pkt[Ether].dst = self.lag_nbor_mac
nat_tcp_pkt[IP].dst = self.lag_nbor_ip
no_nat_tcp_pkt[Ether].dst = self.lag_nbor_mac
no_nat_tcp_pkt[IP].dst = self.lag_nbor_ip
udp_pkt[IP].dst = self.lag_nbor_ip
nat_udp_pkt[Ether].dst = self.lag_nbor_mac
nat_udp_pkt[IP].dst = self.lag_nbor_ip
no_nat_udp_pkt[Ether].dst = self.lag_nbor_mac
no_nat_udp_pkt[IP].dst = self.lag_nbor_ip
print("Sending TCP packet with NAT enabled")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet_any_port(self, nat_tcp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Sending UDP packet with NAT enabled")
send_packet(self, src_port_dev, udp_pkt)
verify_packet_any_port(self, nat_udp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=self.ingr_acl_table)
print("Sending TCP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, tcp_pkt)
verify_packet_any_port(self, no_nat_tcp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, udp_pkt)
verify_packet_any_port(self, no_nat_udp_pkt, self.egr_lag_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=0)
# use route to SVI
print(" -> Egress SVI")
tcp_pkt[IP].dst = self.svi_nbor_ip
nat_tcp_pkt[Ether].dst = self.svi_nbor_mac
nat_tcp_pkt[IP].dst = self.svi_nbor_ip
no_nat_tcp_pkt[Ether].dst = self.svi_nbor_mac
no_nat_tcp_pkt[IP].dst = self.svi_nbor_ip
udp_pkt[IP].dst = self.svi_nbor_ip
nat_udp_pkt[Ether].dst = self.svi_nbor_mac
nat_udp_pkt[IP].dst = self.svi_nbor_ip
no_nat_udp_pkt[Ether].dst = self.svi_nbor_mac
no_nat_udp_pkt[IP].dst = self.svi_nbor_ip
print("Sending TCP packet with NAT enabled")
send_packet(self, src_port_dev, tcp_pkt)
verify_packets(self, nat_tcp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Sending UDP packet with NAT enabled")
send_packet(self, src_port_dev, udp_pkt)
verify_packets(self, nat_udp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyNatHit(snat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=self.ingr_acl_table)
print("Sending TCP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, tcp_pkt)
verify_packets(self, no_nat_tcp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL")
send_packet(self, src_port_dev, udp_pkt)
verify_packets(self, no_nat_udp_pkt, self.egr_svi_dev)
self.assertTrue(self._verifyAclCounter(self.ingr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, src_rif, ingress_acl=0)
try:
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=src_ip),
mask=sai_thrift_nat_entry_mask_t(
src_ip='255.255.255.255'))
snat = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
sai_thrift_create_nat_entry(self.client,
snat,
src_ip=nat_src_ip,
nat_type=SAI_NAT_TYPE_SOURCE_NAT,
enable_packet_count=True)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=1)
print("\n***Ingress L3 port***")
verify_translation(self.ingr_port_rif, self.ingr_port_dev)
print("\n***Ingress L3 LAG***")
for lag_port in self.ingr_lag_dev:
verify_translation(self.ingr_lag_rif, lag_port)
print("\n***Ingress SVI***")
for svi_port in self.ingr_svi_dev:
verify_translation(self.ingr_svi_rif, svi_port)
finally:
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=0)
sai_thrift_remove_nat_entry(self.client, snat)
def dstNatAclTranslationDisableTest(self):
'''
Verifies if translation doesn't occur when destination NAT entries
exist but ACL is configured to disable NAT translation.
Test is performed for different RIFs (regular L3 port, L3 LAG and SVI)
and different nexhtops (regular L3 port, L3 LAG and SVI)
with two kinds of packets - TCP and UDP.
'''
print("\ndstNatAclTranslationDisableTest()")
def verify_translation(dst_rif):
'''
Additional helper function for translation verification.
Verifies if translation doesn't occur when ACL is configured
to disable it.
Args:
dst_rif (oid): object ID of destination RIF
'''
acl_counter = sai_thrift_get_acl_counter_attribute(
self.client, self.egr_acl_counter, packets=True)['packets']
src_ip = "20.20.20.1"
verify_fn = verify_packet
dst_port_dev = self.egr_port_dev
dst_ip = self.nat_ip_to_port
dst_mac = self.port_nbor_mac
nat_dst_ip = self.port_nbor_ip
dnat = port_dnat
if dst_rif == self.egr_lag_rif:
verify_fn = verify_packet_any_port
dst_port_dev = self.egr_lag_dev
dst_ip = self.nat_ip_to_lag
dst_mac = self.lag_nbor_mac
nat_dst_ip = self.lag_nbor_ip
dnat = lag_dnat
elif dst_rif == self.egr_svi_rif:
verify_fn = verify_packets
dst_port_dev = self.egr_svi_dev
dst_ip = self.nat_ip_to_svi
dst_mac = self.svi_nbor_mac
nat_dst_ip = self.svi_nbor_ip
dnat = svi_dnat
tcp_pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=64,
pktlen=100,
with_tcp_chksum=True)
nat_tcp_pkt = simple_tcp_packet(eth_dst=dst_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=nat_dst_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
no_nat_tcp_pkt = simple_tcp_packet(eth_dst=self.no_nat_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=63,
pktlen=100,
with_tcp_chksum=True)
udp_pkt = simple_udp_packet(eth_dst=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=64,
pktlen=100)
nat_udp_pkt = simple_udp_packet(eth_dst=dst_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=nat_dst_ip,
ip_ttl=63,
pktlen=100)
no_nat_udp_pkt = simple_udp_packet(eth_dst=self.no_nat_nbor_mac,
eth_src=ROUTER_MAC,
ip_src=src_ip,
ip_dst=dst_ip,
ip_ttl=63,
pktlen=100)
print(" Inress L3 port")
print("Sending TCP packet with NAT enabled on L3 Port")
send_packet(self, self.ingr_port_dev, tcp_pkt)
verify_fn(self, nat_tcp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Sending UDP packet with NAT enabled on L3 Port")
send_packet(self, self.ingr_port_dev, udp_pkt)
verify_fn(self, nat_udp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client,
self.ingr_port_rif,
ingress_acl=self.egr_acl_table)
print("Sending TCP packet with NAT disabled by ACL on L3 Port")
send_packet(self, self.ingr_port_dev, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL on L3 Port")
send_packet(self, self.ingr_port_dev, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, ingress_acl=0)
print(" Inress L3 LAG")
for src_port in self.ingr_lag_dev:
print("Sending TCP packet with NAT enabled on L3 LAG")
send_packet(self, src_port, tcp_pkt)
verify_fn(self, nat_tcp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Sending UDP packet with NAT enabled on L3 LAG")
send_packet(self, src_port, udp_pkt)
verify_fn(self, nat_udp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client,
self.ingr_lag_rif,
ingress_acl=self.egr_acl_table)
print("Sending TCP packet with NAT disabled by ACL on L3 LAG")
send_packet(self, src_port, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL on L3 LAG")
send_packet(self, src_port, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, ingress_acl=0)
print(" Inress SVI")
for src_port in self.ingr_svi_dev:
print("Sending TCP packet with NAT enabled on SVI")
send_packet(self, src_port, tcp_pkt)
verify_fn(self, nat_tcp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Sending UDP packet with NAT enabled on SVI")
send_packet(self, src_port, udp_pkt)
verify_fn(self, nat_udp_pkt, dst_port_dev)
self.assertTrue(self._verifyNatHit(dnat))
print("\tOK")
print("Disabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client,
self.ingr_svi_rif,
ingress_acl=self.egr_acl_table)
print("Sending TCP packet with NAT disabled by ACL on SVI")
send_packet(self, src_port, tcp_pkt)
verify_packet(self, no_nat_tcp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Sending UDP packet with NAT disabled by ACL on SVI")
send_packet(self, src_port, udp_pkt)
verify_packet(self, no_nat_udp_pkt, self.no_nat_eport)
self.assertTrue(self._verifyAclCounter(self.egr_acl_counter,
acl_counter))
acl_counter += 1
print("\tOK")
print("Enabling NAT on src RIF")
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, ingress_acl=0)
try:
# NAT configuration
port_nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.nat_ip_to_port),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255'))
port_dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=port_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
sai_thrift_create_nat_entry(
self.client, port_dnat, dst_ip=self.port_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT,
enable_packet_count=True)
port_dnat_pool = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=port_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_create_nat_entry(
self.client, port_dnat_pool, dst_ip=self.port_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
lag_nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.nat_ip_to_lag),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255'))
lag_dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=lag_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
sai_thrift_create_nat_entry(
self.client, lag_dnat, dst_ip=self.lag_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT,
enable_packet_count=True)
lag_dnat_pool = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=lag_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_create_nat_entry(
self.client, lag_dnat_pool, dst_ip=self.lag_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
svi_nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.nat_ip_to_svi),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255'))
svi_dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=svi_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
sai_thrift_create_nat_entry(
self.client, svi_dnat, dst_ip=self.svi_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT,
enable_packet_count=True)
svi_dnat_pool = sai_thrift_nat_entry_t(
vr_id=self.default_vrf, data=svi_nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_create_nat_entry(
self.client, svi_dnat_pool, dst_ip=self.svi_nbor_ip,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT_POOL)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=1)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=0)
print("\n***Egress L3 port <-***")
verify_translation(self.egr_port_rif)
print("\n***Egress L3 LAG <-***")
verify_translation(self.egr_lag_rif)
print("\n***Egress SVI <-***")
verify_translation(self.egr_svi_rif)
finally:
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, ingress_acl=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_svi_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_lag_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.ingr_port_rif, nat_zone_id=0)
sai_thrift_set_router_interface_attribute(
self.client, self.egr_port_rif, nat_zone_id=0)
sai_thrift_remove_nat_entry(self.client, svi_dnat_pool)
sai_thrift_remove_nat_entry(self.client, svi_dnat)
sai_thrift_remove_nat_entry(self.client, lag_dnat_pool)
sai_thrift_remove_nat_entry(self.client, lag_dnat)
sai_thrift_remove_nat_entry(self.client, port_dnat_pool)
sai_thrift_remove_nat_entry(self.client, port_dnat)
@group("draft")
class NatTest(SaiHelper):
'''
Basic NAT configuration test
Test topology:
+--------------DUT--------------+
| zone 0 zone 1 |
Nbr2 | server_port wan_port | Nbr1
(192.168.0.1) ---| (port24) (port25) |--- (20.20.20.1)
00:33:44:55:66:77 | | 00:11:22:33:44:55
| 192.168.0.1 <-> 200.200.200.1 |
+-------------------------------+
'''
def setUp(self):
super(NatTest, self).setUp()
self.rif_list = []
self.nhop_list = []
self.nbor_list = []
self.route_list = []
self.nat_list = []
self.nbor_mac = '00:11:22:33:44:55'
self.dmac = '00:22:22:22:22:22'
self.src_ip = '192.168.0.10'
self.dst_ip = '10.10.10.1'
self.nhop_ip = '20.20.20.1'
self.server_ip = '192.168.0.1'
self.server_dmac = '00:33:44:55:66:77'
self.translate_server_ip = '200.200.200.1'
mask = '/24'
# Route configuration
server_port = self.port24
wan_port = self.port25
server_rif = sai_thrift_create_router_interface(
client=self.client, virtual_router_id=self.default_vrf,
type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=server_port,
nat_zone_id=0)
self.rif_list.append(server_rif)
wan_rif = sai_thrift_create_router_interface(
client=self.client, virtual_router_id=self.default_vrf,
type=SAI_ROUTER_INTERFACE_TYPE_PORT, port_id=wan_port,
nat_zone_id=1)
self.rif_list.append(wan_rif)
print("Create nieghbor with %s ip address, %d router interface id and "
"%s destination mac" % (self.nhop_ip, wan_rif, self.nbor_mac))
nbr_entry1 = sai_thrift_neighbor_entry_t(
rif_id=wan_rif,
ip_address=sai_ipaddress(self.nhop_ip))
sai_thrift_create_neighbor_entry(client=self.client,
neighbor_entry=nbr_entry1,
dst_mac_address=self.nbor_mac)
self.nbor_list.append(nbr_entry1)
print("Create nhop with %s ip address and %d router interface id"
% (self.nhop_ip, wan_rif))
nhop1 = sai_thrift_create_next_hop(
self.client, ip=sai_ipaddress(self.nhop_ip),
router_interface_id=wan_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.nhop_list.append(nhop1)
# 10.10.10.0/24 --> NHOP1(WAN)
route_entry1 = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(self.dst_ip + mask))
sai_thrift_create_route_entry(client=self.client,
route_entry=route_entry1,
next_hop_id=nhop1)
self.route_list.append(route_entry1)
route_entry2 = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(self.nhop_ip + mask))
sai_thrift_create_route_entry(client=self.client,
route_entry=route_entry2,
next_hop_id=wan_rif)
self.route_list.append(route_entry2)
# 192.168.0.1/24 --> NHOP(SERVER)
print("Create nieghbor with %s ip address, %d router interface id and "
"%s destination mac"
% (self.server_ip, server_rif, self.server_dmac))
nbr_entry2 = sai_thrift_neighbor_entry_t(
rif_id=server_rif,
ip_address=sai_ipaddress(self.server_ip))
sai_thrift_create_neighbor_entry(client=self.client,
neighbor_entry=nbr_entry2,
dst_mac_address=self.server_dmac)
self.nbor_list.append(nbr_entry2)
print("Create nhop with %s ip address and %d router interface id"
% (self.server_ip, server_rif))
nhop2 = sai_thrift_create_next_hop(
self.client, ip=sai_ipaddress(self.server_ip),
router_interface_id=server_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.nhop_list.append(nhop2)
route_entry3 = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=sai_ipprefix(self.server_ip + mask))
sai_thrift_create_route_entry(client=self.client,
route_entry=route_entry3,
next_hop_id=nhop2)
self.route_list.append(route_entry3)
# NAT configuration
# Translated server IP - 20.20.20.1
# WAN IP - 10.10.10.1
# self.server_ip = '192.168.0.1'
self.l4_src_port = 100
self.l4_dst_port = 1234
proto = 6
self.translate_sport = 500
self.translate_dport = 2000
nat_type = SAI_NAT_TYPE_DESTINATION_NAT
# set NAT output dst_port
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.translate_server_ip,
proto=proto,
l4_dst_port=self.l4_dst_port))
dnat1 = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=nat_type)
# set SAI_NAT_ENTRY_ATTR_DST_IP and SAI_NAT_ENTRY_ATTR_L4_DST_PORT
# (match values)
sai_thrift_create_nat_entry(self.client,
nat_entry=dnat1,
nat_type=nat_type,
dst_ip=self.server_ip,
l4_dst_port=self.translate_dport)
self.nat_list.append(dnat1)
# set NAT output dst_ip
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.translate_server_ip))
dnat2 = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=nat_type)
# set SAI_NAT_ENTRY_ATTR_DST_IP (match value)
sai_thrift_create_nat_entry(self.client,
nat_entry=dnat2,
nat_type=nat_type,
dst_ip=self.server_ip)
self.nat_list.append(dnat2)
# set NAT output dst_ip
nat_type = SAI_NAT_TYPE_DESTINATION_NAT_POOL
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=self.translate_server_ip))
dnat3 = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=nat_type)
# set SAI_NAT_ENTRY_ATTR_DST_IP (match value)
sai_thrift_create_nat_entry(self.client,
nat_entry=dnat3,
nat_type=nat_type,
dst_ip=self.server_ip)
self.nat_list.append(dnat3)
nat_type = SAI_NAT_TYPE_SOURCE_NAT
# set NAT output l4_src_port
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=self.server_ip,
proto=proto,
l4_src_port=self.l4_src_port))
snat4 = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=nat_type)
# set SAI_NAT_ENTRY_ATTR_SRC_IP and SAI_NAT_ENTRY_ATTR_L4_SRC_PORT
# (match values)
sai_thrift_create_nat_entry(self.client,
nat_entry=snat4,
nat_type=nat_type,
src_ip=self.translate_server_ip,
l4_src_port=self.translate_sport)
self.nat_list.append(snat4)
# set NAT output src_ip
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=self.server_ip))
snat5 = sai_thrift_nat_entry_t(vr_id=self.default_vrf,
data=nat_data,
nat_type=nat_type)
# set SAI_NAT_ENTRY_ATTR_SRC_IP and (match value)
sai_thrift_create_nat_entry(self.client,
nat_entry=snat5,
nat_type=nat_type,
src_ip=self.translate_server_ip)
self.nat_list.append(snat5)
def runTest(self):
self.natRouteTest()
self.natTrapTest()
self.srcNatTest()
self.dstNatTest()
def tearDown(self):
# Routing
for route in list(self.route_list):
sai_thrift_remove_route_entry(self.client, route)
for nhop in list(self.nhop_list):
sai_thrift_remove_next_hop(self.client, nhop)
for nbr in list(self.nbor_list):
sai_thrift_remove_neighbor_entry(self.client, nbr)
for rif in list(self.rif_list):
sai_thrift_remove_router_interface(self.client, rif)
# NAT
for nat in list(self.nat_list):
sai_thrift_remove_nat_entry(self.client, nat)
super(NatTest, self).tearDown()
def natTrapTest(self):
'''
Verifies trap configuration
'''
print("\nnatTrapTest()")
try:
trap_group = sai_thrift_create_hostif_trap_group(
self.client, queue=4)
dnat_trap = sai_thrift_create_hostif_trap(
client=self.client,
trap_type=SAI_HOSTIF_TRAP_TYPE_DNAT_MISS,
packet_action=SAI_PACKET_ACTION_TRAP,
trap_group=trap_group)
snat_trap = sai_thrift_create_hostif_trap(
client=self.client,
trap_type=SAI_HOSTIF_TRAP_TYPE_SNAT_MISS,
packet_action=SAI_PACKET_ACTION_TRAP,
trap_group=trap_group)
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.dst_ip,
ip_src=self.src_ip,
ip_id=105,
ip_ttl=64)
pre_stats = query_counter(
self, sai_thrift_get_queue_stats, self.cpu_queue4)
send_packet(self, self.dev_port24, pkt)
time.sleep(4)
post_stats = query_counter(
self, sai_thrift_get_queue_stats, self.cpu_queue4)
self.assertEqual(
post_stats["SAI_QUEUE_STAT_PACKETS"],
pre_stats["SAI_QUEUE_STAT_PACKETS"] + 1)
finally:
sai_thrift_remove_hostif_trap(self.client, dnat_trap)
sai_thrift_remove_hostif_trap(self.client, snat_trap)
sai_thrift_remove_hostif_trap_group(self.client, trap_group)
def natRouteTest(self):
'''
Verifies routing configuration
'''
print("\nnatRouteTest()")
# send the test packet(s)
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.dst_ip,
ip_src=self.src_ip,
ip_id=105,
ip_ttl=64)
exp_pkt = simple_tcp_packet(eth_dst=self.nbor_mac,
eth_src=ROUTER_MAC,
ip_dst=self.dst_ip,
ip_src=self.src_ip,
ip_id=105,
ip_ttl=63)
send_packet(self, self.dev_port24, pkt)
verify_packets(self, exp_pkt, [self.dev_port25])
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.nhop_ip,
ip_src=self.src_ip,
ip_id=105,
ip_ttl=64)
exp_pkt = simple_tcp_packet(eth_dst=self.nbor_mac,
eth_src=ROUTER_MAC,
ip_dst=self.nhop_ip,
ip_src=self.src_ip,
ip_id=105,
ip_ttl=63)
send_packet(self, self.dev_port24, pkt)
verify_packets(self, exp_pkt, [self.dev_port25])
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.server_ip,
ip_src=self.nhop_ip,
ip_id=105,
ip_ttl=64)
exp_pkt = simple_tcp_packet(eth_dst=self.server_dmac,
eth_src=ROUTER_MAC,
ip_src=self.nhop_ip,
ip_dst=self.server_ip,
ip_id=105,
ip_ttl=63)
send_packet(self, self.dev_port25, pkt)
verify_packets(self, exp_pkt, [self.dev_port24])
def srcNatTest(self):
'''
Verifies source NAT
'''
print("\nsrcNatTest()")
# Validate server to WAN
# Translate server-ip to public ip
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.dst_ip,
ip_src=self.server_ip,
ip_id=105,
ip_ttl=64)
exp_pkt = simple_tcp_packet(eth_dst=self.nbor_mac,
eth_src=ROUTER_MAC,
ip_dst=self.dst_ip,
ip_src=self.translate_server_ip,
ip_id=105,
ip_ttl=63)
send_packet(self, self.dev_port24, pkt)
verify_packets(self, exp_pkt, [self.dev_port25])
# Translate server-ip + port to public ip + port
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.dst_ip,
ip_src=self.server_ip,
ip_id=105,
ip_ttl=64,
tcp_sport=self.l4_src_port)
exp_pkt = simple_tcp_packet(eth_dst=self.nbor_mac,
eth_src=ROUTER_MAC,
ip_dst=self.dst_ip,
ip_src=self.translate_server_ip,
ip_id=105,
ip_ttl=63,
tcp_sport=self.translate_sport)
send_packet(self, self.dev_port24, pkt)
verify_packets(self, exp_pkt, [self.dev_port25])
ret_attr = sai_thrift_get_nat_entry_attribute(
client=self.client,
nat_entry=self.nat_list[3],
packet_count=True)
self.assertEqual(ret_attr["packet_count"], 1)
print("Packet count %d" % (ret_attr["packet_count"]))
ret_attr = sai_thrift_get_nat_entry_attribute(
client=self.client,
nat_entry=self.nat_list[3],
hit_bit=True)
self.assertEqual(ret_attr["hit_bit"], True)
print("1st query - Hit bit %r" % (ret_attr["hit_bit"]))
ret_attr = sai_thrift_get_nat_entry_attribute(
client=self.client,
nat_entry=self.nat_list[3],
hit_bit=True)
self.assertEqual(ret_attr["hit_bit"], False)
print("2nd query - Hit bit %r" % (ret_attr["hit_bit"]))
sai_thrift_set_nat_entry_attribute(
client=self.client,
nat_entry=self.nat_list[3],
packet_count=0)
ret_attr = sai_thrift_get_nat_entry_attribute(
client=self.client,
nat_entry=self.nat_list[3],
packet_count=True)
self.assertEqual(ret_attr["packet_count"], 0)
print("Packet count %d" % (ret_attr["packet_count"]))
def dstNatTest(self):
'''
Verifies destination NAT
'''
print("\ndstNatTest()")
# Translate public IP to server IP
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.translate_server_ip,
ip_src=self.dst_ip,
ip_id=105,
ip_ttl=64)
exp_pkt = simple_tcp_packet(eth_dst=self.server_dmac,
eth_src=ROUTER_MAC,
ip_dst=self.server_ip,
ip_src=self.dst_ip,
ip_id=105,
ip_ttl=63)
send_packet(self, self.dev_port25, pkt)
verify_packets(self, exp_pkt, [self.dev_port24])
# Translate public IP + port to server IP + port
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src=self.dmac,
ip_dst=self.translate_server_ip,
ip_src=self.dst_ip,
ip_id=105,
ip_ttl=64,
tcp_dport=self.l4_dst_port)
exp_pkt = simple_tcp_packet(eth_dst=self.server_dmac,
eth_src=ROUTER_MAC,
ip_dst=self.server_ip,
ip_src=self.dst_ip,
ip_id=105,
ip_ttl=63,
tcp_dport=self.translate_dport)
send_packet(self, self.dev_port25, pkt)
verify_packets(self, exp_pkt, [self.dev_port24])
ret_attr = sai_thrift_get_nat_entry_attribute(
client=self.client,
nat_entry=self.nat_list[0],
packet_count=True)
self.assertEqual(ret_attr["packet_count"], 1)
print("Packet count %d" % (ret_attr["packet_count"]))