ptf/saiswitch.py (2,653 lines of code) (raw):
# Copyright 2021-present Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Thrift SAI interface Switch tests
"""
from ipaddress import ip_address
from sai_thrift.sai_headers import * # pylint: disable=wildcard-import; lgtm[py/polluting-import]
from ptf.mask import Mask
from lpm import LpmDict
from sai_base_test import * # pylint: disable=wildcard-import; lgtm[py/polluting-import]
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(THIS_DIR, '..'))
CATCH_EXCEPTIONS = True
# SAI_STATUS_NOT_IMPLEMENTED
ACCEPTED_ERROR_CODE = [SAI_STATUS_NOT_IMPLEMENTED]
#SAI_STATUS_ATTR_NOT_IMPLEMENTED
ACCEPTED_ERROR_CODE += range(SAI_STATUS_ATTR_NOT_IMPLEMENTED_MAX, SAI_STATUS_ATTR_NOT_IMPLEMENTED_0+1)
#unknow attribute
ACCEPTED_ERROR_CODE += range(SAI_STATUS_UNKNOWN_ATTRIBUTE_MAX, SAI_STATUS_UNKNOWN_ATTRIBUTE_0+1)
#SAI_STATUS_ATTR_NOT_SUPPORTED
ACCEPTED_ERROR_CODE += range(SAI_STATUS_ATTR_NOT_SUPPORTED_MAX, SAI_STATUS_ATTR_NOT_SUPPORTED_0+1)
def set_accepted_exception():
"""
Set accepted exceptions.
"""
adapter.CATCH_EXCEPTIONS = CATCH_EXCEPTIONS
adapter.EXPECTED_ERROR_CODE = ACCEPTED_ERROR_CODE
print("setting accepted exception")
def generate_ip_addr(no_of_addr, ipv6=False):
'''
An IP address generator using generic way of addresses randomization
and ensuring address is unique
Args:
no_of_addr (int): number of requested IP addresses
ipv6 (bool): IP version indicator
Yield:
str: unique IP address
'''
if not ipv6:
ip_range = [addr for addr in
["1.0.0.1", "254.255.255.254"]]
else:
ip_range = [addr for addr in
["2001::0", "2001:0db8::ffff:ffff:ffff"]]
ip_interval = LpmDict.IpInterval(
ip_address(ip_range[0]), ip_address(ip_range[1]))
for _ in range(no_of_addr):
ip_addr = ip_interval.get_random_ip()
yield ip_addr
def generate_mac_list(no_of_addr):
'''
Generate list of different mac addresses
Args:
no_of_addr (int): number of requested MAC addresses (max 256^4)
Return:
list: mac_list with generated MAC addresses
'''
mac_list = []
i = 0
for first_grp in range(1, 256):
for second_grp in range(1, 256):
for third_grp in range(1, 256):
for fourth_grp in range(1, 256):
mac_list.append('00:00:' +
('%02x' % first_grp) + ':' +
('%02x' % second_grp) + ':' +
('%02x' % third_grp) + ':' +
('%02x' % fourth_grp))
i += 1
if i == no_of_addr:
return mac_list
return mac_list
class AvailableResourceTestHelper(PlatformSaiHelper):
def setUp(self):
super(AvailableResourceTestHelper, self).setUp()
# values required by neighbor entries tests set by route entries test
# Note: routes entries tests should be run as first
self.available_v4_host_routes = None
self.available_v6_host_routes = None
def availableIPv4RouteEntryTest(self):
"""
Verifies creation of maximum number of IPv4 route entries.
"""
print("\navailableIPv4RouteEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_route_entry=True)
self.max_route_entry = attr["available_ipv4_route_entry"]
print("Available IPv4 route entries: %d" % self.max_route_entry)
self.routes = dict()
mask = '/32'
ip_add = generate_ip_addr(self.max_route_entry + 100)
try:
self.neigh_entry = sai_thrift_neighbor_entry_t(
self.switch_id, self.port10_rif, sai_ipaddress('10.10.10.1'))
self.neigh = sai_thrift_create_neighbor_entry(
self.client, self.neigh_entry, dst_mac_address='00:11:22:33:44:55')
self.nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress('10.10.10.1'),
router_interface_id=self.port10_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(self.nhop, SAI_NULL_OBJECT_ID)
self.route_number = 0
max_host_route = 0
while self.route_number < self.max_route_entry:
ip_p_m = sai_ipprefix(next(ip_add) + mask)
# check if ip repeat, then get next ip
if str(ip_p_m) in self.routes:
print("ip repeat")
continue
route_entry = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=ip_p_m)
status = sai_thrift_create_route_entry(
self.client, route_entry, next_hop_id=self.nhop)
# print(self.route_number)
if status == SAI_STATUS_SUCCESS:
self.routes.update({str(ip_p_m): route_entry})
self.route_number += 1
elif status == SAI_STATUS_ITEM_ALREADY_EXISTS:
print("SAI_STATUS_ITEM_ALREADY_EXISTS")
continue
elif mask == '/32': # when host table is full change to LPM
print("%s host routes have been created" % self.route_number)
max_host_route = self.route_number
self.available_v4_host_routes = max_host_route
mask = '/30'
continue
else:
self.fail("Route creation failed after creating %d "
"entries, status %u" % (self.route_number, status))
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_route_entry=True)
self.assertEqual(attr["available_ipv4_route_entry"],
self.max_route_entry - self.route_number)
ip_add.close()
self.available_v4_host_routes = self.route_number
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_route_entry=True)
self.assertEqual(attr["available_ipv4_route_entry"], 0)
finally:
for ip_p_m in self.routes:
sai_thrift_remove_route_entry(self.client, self.routes.get(ip_p_m))
sai_thrift_remove_next_hop(self.client, self.nhop)
def availableIPv6RouteEntryTest(self):
"""
Verifies creation of maximum number of IPv6 route entries.
"""
print("\navailableIPv6RouteEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_route_entry=True)
max_route_entry = attr["available_ipv6_route_entry"]
print("Available IPv6 route entries: %d" % max_route_entry)
self.available_v6_host_routes = max_route_entry
self.routes = dict()
mask = '/128'
ip_add = generate_ip_addr(max_route_entry + 100, ipv6=True)
try:
self.neigh_entry = sai_thrift_neighbor_entry_t(
self.switch_id, self.port10_rif, sai_ipaddress('2001:0db8:1::1'))
self.neigh = sai_thrift_create_neighbor_entry(
self.client, self.neigh_entry, dst_mac_address='00:11:22:33:44:55')
self.nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress('2001:0db8:1::1'),
router_interface_id=self.port10_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(self.nhop, SAI_NULL_OBJECT_ID)
route_number = 0
max_host_route = 0
while route_number < max_route_entry:
ip_p_m = sai_ipprefix(next(ip_add) + mask)
# check if ip repeat, then get next ip
if str(ip_p_m) in self.routes:
continue
route_entry = sai_thrift_route_entry_t(
switch_id=self.switch_id,
vr_id=self.default_vrf,
destination=ip_p_m)
status = sai_thrift_create_route_entry(
self.client, route_entry, next_hop_id=self.nhop)
if status == SAI_STATUS_SUCCESS:
self.routes.update({str(ip_p_m): route_entry})
route_number += 1
elif status == SAI_STATUS_ITEM_ALREADY_EXISTS:
continue
elif mask == '/128': # when host table is full change to LPM
print("%s host routes have been created" % route_number)
max_host_route = route_number
self.available_v6_host_routes = max_host_route
mask = '/120'
continue
elif mask == '/120': # when LPM table is full change to LPM64
print("%s host + LPM routes have been created so far"
% route_number)
mask = '/64'
continue
else:
self.fail("Route creation failed after creating %d "
"entries, status %u" % (route_number, status))
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_route_entry=True)
self.assertEqual(attr["available_ipv6_route_entry"],
max_route_entry - route_number)
ip_add.close()
self.available_v6_host_routes = route_number
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_route_entry=True)
self.assertEqual(attr["available_ipv6_route_entry"], 0)
finally:
for ip_p_m in self.routes:
sai_thrift_remove_route_entry(self.client, self.routes.get(ip_p_m))
sai_thrift_remove_next_hop(self.client, self.nhop)
def availableIPv4NeighborEntryTest(self):
"""
Verifies creation of maximum number of IPv4 neighbor entries.
"""
print("\navailableIPv4NeighborEntryTest()")
if self.available_v4_host_routes is None:
print("availableIPv4RouteEntryTest must be run first")
return
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_neighbor_entry=True)
available_nbr_entry = attr["available_ipv4_neighbor_entry"]
print("Available IPv4 neighbor entries: %d" % available_nbr_entry)
if available_nbr_entry > self.available_v4_host_routes:
print("Cannot create more neighbor entries than available host "
"routes which is %d" % self.available_v4_host_routes)
max_nbr_entry = self.available_v4_host_routes
else:
max_nbr_entry = available_nbr_entry
nbrs = dict()
ip_add = generate_ip_addr(max_nbr_entry + 100)
try:
nbr_number = 0
while nbr_number < max_nbr_entry:
ip_p = sai_ipaddress(next(ip_add))
# check if ip repeat, then get next ip
if str(ip_p) in nbrs:
continue
nbr_entry = sai_thrift_neighbor_entry_t(
rif_id=self.port10_rif,
ip_address=ip_p)
status = sai_thrift_create_neighbor_entry(
self.client,
nbr_entry,
dst_mac_address='00:00:00:00:00:01',
no_host_route=False)
self.assertEqual(status, SAI_STATUS_SUCCESS)
nbrs.update({str(ip_p): nbr_entry})
nbr_number += 1
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_neighbor_entry=True)
self.assertEqual(attr["available_ipv4_neighbor_entry"],
available_nbr_entry - nbr_number,
"Failed after %d entries" % nbr_number)
finally:
for ip_p in nbrs:
sai_thrift_remove_neighbor_entry(self.client, nbrs.get(ip_p))
def availableIPv6NeighborEntryTest(self):
"""
Verifies creation of maximum number of IPv6 neighbor entries.
"""
print("\navailableIPv6NeighborEntryTest()")
if self.available_v6_host_routes is None:
print("availableIPv6RouteEntryTest must be run first")
return
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_neighbor_entry=True)
available_nbr_entry = attr["available_ipv6_neighbor_entry"]
print("Available IPv6 neighbor entries: %d" % available_nbr_entry)
if available_nbr_entry > self.available_v6_host_routes:
print("Cannot create more neighbor entries than available host "
"routes which is %d" % self.available_v6_host_routes)
max_nbr_entry = self.available_v6_host_routes
else:
max_nbr_entry = available_nbr_entry
nbrs = dict()
ip_add = generate_ip_addr(max_nbr_entry + 100, ipv6=True)
try:
nbr_number = 0
while nbr_number < max_nbr_entry:
ip_p = sai_ipaddress(next(ip_add))
# check if ip repeat, then get next ip
if str(ip_p) in nbrs:
continue
nbr_entry = sai_thrift_neighbor_entry_t(
rif_id=self.port10_rif,
ip_address=ip_p)
status = sai_thrift_create_neighbor_entry(
self.client,
nbr_entry,
dst_mac_address='00:00:00:00:00:01',
no_host_route=False)
nbrs.update({str(ip_p): nbr_entry})
nbr_number += 1
self.assertEqual(status, SAI_STATUS_SUCCESS)
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_neighbor_entry=True)
self.assertEqual(attr["available_ipv6_neighbor_entry"],
available_nbr_entry - nbr_number,
"Failed after %d entries" % nbr_number)
finally:
for ip_p in nbrs:
sai_thrift_remove_neighbor_entry(self.client, nbrs.get(ip_p))
class AvailableIPv4RouteEntryTest(AvailableResourceTestHelper):
def runTest(self):
self.availableIPv4RouteEntryTest()
class AvailableIPv6RouteEntryTest(AvailableResourceTestHelper):
def runTest(self):
self.availableIPv6RouteEntryTest()
class AvailableIPv4NeighborEntryTest(AvailableResourceTestHelper):
def runTest(self):
self.availableIPv4RouteEntryTest()
self.availableIPv4NeighborEntryTest()
class AvailableIPv6NeighborEntryTest(AvailableResourceTestHelper):
def runTest(self):
self.availableIPv6RouteEntryTest()
self.availableIPv6NeighborEntryTest()
class AvailableFdbEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of FDB entries.
"""
def runTest(self):
print("\navailableFdbEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_fdb_entry=True)
max_fdb_entry = attr["available_fdb_entry"]
print("Available FDB entries: %d" % max_fdb_entry)
# Verifying only up to 90% of FDB table capacity
available_fdb_entry = int(max_fdb_entry * 0.9)
mac_list = generate_mac_list(max_fdb_entry)
fdb = []
try:
for fdb_number in range(1, available_fdb_entry + 1):
fdb_entry = sai_thrift_fdb_entry_t(
switch_id=self.switch_id,
mac_address=mac_list[fdb_number - 1],
bv_id=self.vlan10)
status = sai_thrift_create_fdb_entry(
self.client,
fdb_entry,
type=SAI_FDB_ENTRY_TYPE_STATIC,
bridge_port_id=self.port0_bp,
packet_action=SAI_PACKET_ACTION_FORWARD)
self.assertEqual(status, SAI_STATUS_SUCCESS)
fdb.append(fdb_entry)
attr = sai_thrift_get_switch_attribute(
self.client, available_fdb_entry=True)
self.assertEqual(attr["available_fdb_entry"],
max_fdb_entry - fdb_number)
finally:
for fdb_id in fdb:
sai_thrift_remove_fdb_entry(self.client, fdb_id)
class ReadOnlyAttributesTest(PlatformSaiHelper):
"""
Verifies get on read only attributes.
"""
def setUp(self):
set_accepted_exception()
super(ReadOnlyAttributesTest, self).setUp()
def readOnlyAttributesTest(self):
print("\nreadOnlyAttributesTest()")
attr = sai_thrift_get_switch_attribute(self.client,
number_of_active_ports=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_active_ports"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_ACTIVE_PORTS"], 0)
active_ports = attr["number_of_active_ports"]
else:
print("get number_of_active_ports failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(
self.client, max_number_of_supported_ports=True)
if attr:
print(attr)
self.assertNotEqual(attr["max_number_of_supported_ports"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_MAX_NUMBER_OF_SUPPORTED_PORTS"], 0)
else:
# Broadcom return -196608
print("get max_number_of_supported_ports failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(
self.client,
port_list=sai_thrift_object_list_t(idlist=[], count=active_ports))
if attr:
print(attr)
self.assertNotEqual(attr["port_list"].count, 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_PORT_LIST"].count, 0)
else:
print("get port_list failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client, port_max_mtu=True)
if attr:
print(attr)
self.assertNotEqual(attr["port_max_mtu"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_PORT_MAX_MTU"], 0)
else:
# Broadcom return -196608
print("get port_max_mtu failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client, cpu_port=True)
if attr:
print(attr)
self.assertNotEqual(attr["cpu_port"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_CPU_PORT"], 0)
else:
print("get cpu_port failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
max_virtual_routers=True)
if attr:
print(attr)
self.assertNotEqual(attr["max_virtual_routers"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_VIRTUAL_ROUTERS"], 0)
else:
print("get max_virtual_routers failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
fdb_table_size=True)
if attr:
print(attr)
self.assertNotEqual(attr["fdb_table_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_FDB_TABLE_SIZE"], 0)
else:
print("get fdb_table_size failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
l3_neighbor_table_size=True)
if attr:
print(attr)
self.assertNotEqual(attr["l3_neighbor_table_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_L3_NEIGHBOR_TABLE_SIZE"], 0)
else:
# Broadcom return -196608
print("get l3_neighbor_table_size failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
l3_route_table_size=True)
if attr:
print(attr)
self.assertNotEqual(attr["l3_route_table_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_L3_ROUTE_TABLE_SIZE"], 0)
else:
# Broadcom return -196608
print("get l3_route_table_size failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client, lag_members=True)
if attr:
print(attr)
self.assertNotEqual(attr["lag_members"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_LAG_MEMBERS"], 0)
else:
print("get lag_members failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
number_of_lags=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_lags"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_LAGS"], 0)
else:
print("get number_of_lags failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client, ecmp_members=True)
if attr:
print(attr)
self.assertNotEqual(attr["ecmp_members"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ECMP_MEMBERS"], 0)
else:
print("get ecmp_members failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
number_of_ecmp_groups=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_ecmp_groups"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_ECMP_GROUPS"], 0)
else:
print("get number_of_ecmp_groups failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
number_of_unicast_queues=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_unicast_queues"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_NUMBER_OF_UNICAST_QUEUES"], 0)
else:
print("get number_of_unicast_queues failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
number_of_multicast_queues=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_multicast_queues"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_NUMBER_OF_MULTICAST_QUEUES"], 0)
else:
print("get number_of_multicast_queues failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
number_of_queues=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_queues"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_QUEUES"], 0)
else:
print("get number_of_queues failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
number_of_cpu_queues=True)
if attr:
print(attr)
self.assertNotEqual(attr["number_of_cpu_queues"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_CPU_QUEUES"], 0)
else:
print("get number_of_cpu_queues failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
acl_table_minimum_priority=True)
if attr:
print(attr)
self.assertEqual(attr["acl_table_minimum_priority"], 0)
self.assertEqual(
attr["SAI_SWITCH_ATTR_ACL_TABLE_MINIMUM_PRIORITY"], 0)
else:
print("get acl_table_minimum_priority failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(
self.client, acl_table_maximum_priority=True)
if attr:
print(attr)
self.assertNotEqual(attr["acl_table_maximum_priority"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_ACL_TABLE_MAXIMUM_PRIORITY"], 0)
else:
print("get acl_table_maximum_priority failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
acl_entry_minimum_priority=True)
if attr:
print(attr)
self.assertEqual(attr["acl_entry_minimum_priority"], 0)
self.assertEqual(attr["SAI_SWITCH_ATTR_ACL_ENTRY_MINIMUM_PRIORITY"], 0)
else:
print("get acl_entry_minimum_priority failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
acl_entry_maximum_priority=True)
if attr:
print(attr)
self.assertNotEqual(attr["acl_entry_maximum_priority"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_ACL_ENTRY_MAXIMUM_PRIORITY"], 0)
else:
print("get acl_entry_maximum_priority failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
default_vlan_id=True)
if attr:
print(attr)
self.assertNotEqual(attr["default_vlan_id"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_DEFAULT_VLAN_ID"], 0)
else:
print("get default_vlan_id failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
default_stp_inst_id=True)
if attr:
print(attr)
# Broadcom got attr["default_stp_inst_id"]=68719476737
self.assertEqual(attr["default_stp_inst_id"], SAI_NULL_OBJECT_ID)
self.assertEqual(
attr["SAI_SWITCH_ATTR_DEFAULT_STP_INST_ID"], SAI_NULL_OBJECT_ID)
else:
print("get default_stp_inst_id failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
max_stp_instance=True)
if attr:
print(attr)
self.assertNotEqual(attr["max_stp_instance"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_STP_INSTANCE"], 0)
else:
print("get max_stp_instance failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
default_virtual_router_id=True)
if attr:
print(attr)
self.assertNotEqual(attr["default_virtual_router_id"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_DEFAULT_VIRTUAL_ROUTER_ID"], 0)
else:
print("get default_virtual_router_id failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
default_1q_bridge_id=True)
if attr:
print(attr)
self.assertNotEqual(attr["default_1q_bridge_id"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_DEFAULT_1Q_BRIDGE_ID"], 0)
else:
print("get default_1q_bridge_id failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(
self.client, qos_max_number_of_traffic_classes=True)
if attr:
print(attr)
self.assertNotEqual(attr["qos_max_number_of_traffic_classes"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_TRAFFIC_CLASSES"], 0)
else:
print("get qos_max_number_of_traffic_classes failed: {}".format(self.status()))
attr_name = "SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_SCHEDULER_GROUP_" + \
"HIERARCHY_LEVELS"
attr = sai_thrift_get_switch_attribute(
self.client,
qos_max_number_of_scheduler_group_hierarchy_levels=True)
if attr:
print(attr)
self.assertNotEqual(
attr["qos_max_number_of_scheduler_group_hierarchy_levels"], 0)
self.assertNotEqual(attr[attr_name], 0)
else:
print("get qos_max_number_of_scheduler_group_hierarchy_levels failed: {}".format(self.status()))
scheduler_group_levels = attr[
"qos_max_number_of_scheduler_group_hierarchy_levels"]
value = sai_thrift_u32_list_t(count=scheduler_group_levels,
uint32list=[])
attr_name = "SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_SCHEDULER_GROUPS_" + \
"PER_HIERARCHY_LEVEL"
attr = sai_thrift_get_switch_attribute(
self.client,
qos_max_number_of_scheduler_groups_per_hierarchy_level=value)
if attr:
print(attr)
self.assertNotEqual(
attr["qos_max_number_of_scheduler_groups_per_hierarchy_level"], 0)
self.assertNotEqual(attr[attr_name], 0)
else:
print("get qos_max_number_of_scheduler_groups_per_hierarchy_level failed: {}".format(self.status()))
attr_name = "SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_CHILDS_" + \
"PER_SCHEDULER_GROUP"
attr = sai_thrift_get_switch_attribute(
self.client, qos_max_number_of_childs_per_scheduler_group=True)
if attr:
print(attr)
self.assertNotEqual(
attr["qos_max_number_of_childs_per_scheduler_group"], 0)
self.assertNotEqual(attr[attr_name], 0)
else:
print("get qos_max_number_of_childs_per_scheduler_group failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
total_buffer_size=True)
if attr:
print(attr)
self.assertNotEqual(attr["total_buffer_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_TOTAL_BUFFER_SIZE"], 0)
else:
print("get total_buffer_size failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
ingress_buffer_pool_num=True)
if attr:
print(attr)
self.assertNotEqual(attr["ingress_buffer_pool_num"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_INGRESS_BUFFER_POOL_NUM"], 0)
else:
print("get ingress_buffer_pool_num failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
egress_buffer_pool_num=True)
if attr:
print(attr)
self.assertNotEqual(attr["egress_buffer_pool_num"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_EGRESS_BUFFER_POOL_NUM"], 0)
else:
print("get egress_buffer_pool_num failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client, ecmp_hash=True)
if attr:
print(attr)
self.assertNotEqual(attr["ecmp_hash"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ECMP_HASH"], 0)
else:
print("get ecmp_hash failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client, lag_hash=True)
if attr:
print(attr)
self.assertNotEqual(attr["lag_hash"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_LAG_HASH"], 0)
else:
print("get lag_hash failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
max_acl_action_count=True)
if attr:
print(attr)
self.assertNotEqual(attr["max_acl_action_count"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_ACL_ACTION_COUNT"], 0)
max_acl_action_count = attr["max_acl_action_count"]
else:
print("get max_acl_action_count failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
max_acl_range_count=True)
if attr:
print(attr)
self.assertNotEqual(attr["max_acl_range_count"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_ACL_RANGE_COUNT"], 0)
else:
# Broadcom return -196608
print("get max_acl_range_count failed: {}".format(self.status()))
s32 = sai_thrift_s32_list_t(int32list=[], count=max_acl_action_count)
cap = sai_thrift_acl_capability_t(action_list=s32)
attr = sai_thrift_get_switch_attribute(self.client,
acl_capability=cap)
if attr:
print(attr)
self.assertNotEqual(attr["acl_capability"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ACL_CAPABILITY"], 0)
else:
print("get acl_capability failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
max_mirror_session=True)
if attr:
print(attr)
self.assertNotEqual(attr["max_mirror_session"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_MIRROR_SESSION"], 0)
else:
# Broadcom return -196608
print("get max_mirror_session failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
default_trap_group=True)
if attr:
print(attr)
self.assertNotEqual(attr["default_trap_group"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_DEFAULT_TRAP_GROUP"], 0)
else:
print("get default_trap_group failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
acl_stage_ingress=cap)
if attr:
print(attr)
self.assertNotEqual(attr["acl_stage_ingress"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ACL_STAGE_INGRESS"], 0)
else:
print("get acl_stage_ingress failed: {}".format(self.status()))
attr = sai_thrift_get_switch_attribute(self.client,
acl_stage_egress=cap)
if attr:
print(attr)
self.assertNotEqual(attr["acl_stage_egress"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ACL_STAGE_EGRESS"], 0)
else:
print("get acl_stage_egress failed: {}".format(self.status()))
def runTest(self):
self.readOnlyAttributesTest()
class RefreshIntervalTest(PlatformSaiHelper):
"""
Verifies SAI_SWITCH_ATTR_COUNTER_REFRESH_INTERVAL switch attribute
applied to VLAN and RIF stats
"""
def setUp(self):
set_accepted_exception()
super(RefreshIntervalTest, self).setUp()
def refreshIntervalTest(self):
print("\nrefreshIntervalTest()")
attr = sai_thrift_get_switch_attribute(
self.client, counter_refresh_interval=True)
# Can not get this attribute on Broadcom, status return -196608
self.assertEqual(self.status(), SAI_STATUS_SUCCESS)
init_interval = attr["counter_refresh_interval"]
print("Counters refresh interval initially set to %d sec"
% init_interval)
test_vlan = self.vlan10
test_vlan_port = self.dev_port0
test_rif = self.port10_rif
test_rif_port = self.dev_port10
test_interval = 10
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
init_vlan_counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
init_rif_counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
pkt = simple_udp_packet()
try:
print("\nTesting VLAN stats counters refresh time")
# compensate refresh time shift
send_packet(self, test_vlan_port, pkt)
while query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)[
'SAI_VLAN_STAT_IN_PACKETS'] == init_vlan_counter:
time.sleep(0.1)
init_vlan_counter += 1
send_packet(self, test_vlan_port, pkt)
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_vlan_counter:
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
timer_end = time.time()
init_vlan_counter += 1
interval = int(round(timer_end - timer_start))
print("VLAN stats refreshed after %d sec" % interval)
self.assertEqual(init_interval, int(round(interval)))
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
print("Setting refresh interval to %d sec" % test_interval)
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=test_interval)
attr = sai_thrift_get_switch_attribute(
self.client, counter_refresh_interval=True)
set_interval = attr["counter_refresh_interval"]
print("Refresh interval set to %d sec" % set_interval)
self.assertEqual(set_interval, test_interval)
# compensate refresh time shift
send_packet(self, test_vlan_port, pkt)
while query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)[
'SAI_VLAN_STAT_IN_PACKETS'] == init_vlan_counter:
time.sleep(0.1)
init_vlan_counter += 1
send_packet(self, test_vlan_port, pkt)
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_vlan_counter:
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
timer_end = time.time()
init_vlan_counter += 1
interval = int(round(timer_end - timer_start))
print("VLAN stats refreshed after %d sec" % interval)
self.assertEqual(test_interval, interval)
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
finally:
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=init_interval)
try:
print("\nTesting RIF stats counters refresh time")
# compensate refresh time shift
send_packet(self, test_rif_port, pkt)
while query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)[
'SAI_ROUTER_INTERFACE_STAT_IN_PACKETS'] == \
init_rif_counter:
time.sleep(0.1)
init_rif_counter += 1
send_packet(self, test_rif_port, pkt)
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
self.assertEqual(counter, init_rif_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_rif_counter:
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
timer_end = time.time()
init_rif_counter += 1
interval = int(round(timer_end - timer_start))
print("RIF stats refreshed after %d sec" % interval)
self.assertEqual(init_interval, int(round(interval)))
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
self.assertEqual(counter, init_rif_counter)
print("Setting refresh interval to %d sec" % test_interval)
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=test_interval)
attr = sai_thrift_get_switch_attribute(
self.client, counter_refresh_interval=True)
set_interval = attr["counter_refresh_interval"]
print("Refresh interval set to %d sec" % set_interval)
self.assertEqual(set_interval, test_interval)
# compensate refresh time shift
send_packet(self, test_rif_port, pkt)
while query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)[
'SAI_ROUTER_INTERFACE_STAT_IN_PACKETS'] == \
init_rif_counter:
time.sleep(0.1)
init_rif_counter += 1
send_packet(self, test_rif_port, pkt)
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
self.assertEqual(counter, init_rif_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_rif_counter:
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
timer_end = time.time()
init_rif_counter += 1
interval = int(round(timer_end - timer_start))
print("RIF stats refreshed after %d sec" % interval)
self.assertEqual(test_interval, interval)
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
finally:
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=init_interval)
def runTest(self):
self.refreshIntervalTest()
class AvailableSnatEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of snat entries.
"""
def availableSnatEntryTest(self):
print("\navailableSnatEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_snat_entry=True)
max_snat_entry = attr["available_snat_entry"]
print("Available SNAT entries: %d" % max_snat_entry)
snat_list = []
addr = generate_ip_addr(max_snat_entry + 1)
try:
for snat_number in range(1, max_snat_entry + 1):
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=next(addr), proto=6),
mask=sai_thrift_nat_entry_mask_t(
src_ip='255.255.255.255', proto=63))
snat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
status = sai_thrift_create_nat_entry(
self.client, snat, nat_type=SAI_NAT_TYPE_SOURCE_NAT)
# Broadcom return -23 SAI_STATUS_NOT_EXECUTED
self.assertEqual(status, SAI_STATUS_SUCCESS)
snat_list.append(snat)
attr = sai_thrift_get_switch_attribute(
self.client, available_snat_entry=True)
self.assertEqual(attr["available_snat_entry"],
max_snat_entry - snat_number)
# checking if no more SNAT entry may be created
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=next(addr),
proto=6),
mask=sai_thrift_nat_entry_mask_t(
src_ip='255.255.255.255',
proto=63))
try:
snat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
stat = sai_thrift_create_nat_entry(
self.client, snat,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
self.assertNotEqual(stat, SAI_STATUS_SUCCESS)
finally:
if not stat:
sai_thrift_remove_nat_entry(self.client, snat)
finally:
for snat in snat_list:
sai_thrift_remove_nat_entry(self.client, snat)
def runTest(self):
self.availableSnatEntryTest()
class AvailableDnatEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of dnat entries.
"""
def availableDnatEntryTest(self):
print("\navailableDnatEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_dnat_entry=True)
max_dnat_entry = attr["available_dnat_entry"]
print("Available DNAT entries: %d" % max_dnat_entry)
dnat_list = []
addr = generate_ip_addr(max_dnat_entry + 1)
try:
for dnat_number in range(1, max_dnat_entry + 1):
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=next(addr),
proto=6,
l4_src_port=100),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255', proto=63, l4_src_port=255))
dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
status = sai_thrift_create_nat_entry(
self.client, dnat, nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
# Broadcom return -23 SAI_STATUS_NOT_EXECUTED
self.assertEqual(status, SAI_STATUS_SUCCESS)
dnat_list.append(dnat)
attr = sai_thrift_get_switch_attribute(
self.client, available_dnat_entry=True)
self.assertEqual(attr["available_dnat_entry"],
max_dnat_entry - dnat_number)
# checking if no more DNAT entry may be created
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=next(addr),
proto=6),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255',
proto=63))
try:
dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
stat = sai_thrift_create_nat_entry(
self.client, dnat,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
self.assertNotEqual(stat, SAI_STATUS_SUCCESS)
finally:
if not stat:
sai_thrift_remove_nat_entry(self.client, dnat)
finally:
for dnat in dnat_list:
sai_thrift_remove_nat_entry(self.client, dnat)
def runTest(self):
self.availableDnatEntryTest()
class AvailableNexthopGroupEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of nexthop group entries.
"""
def availableNexthopGroupEntryTest(self):
print("\navailableNexthopGroupEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
max_nhg_entry = attr["available_next_hop_group_entry"]
print("Available nexthop group entries: %d" % max_nhg_entry)
nhg = []
try:
for nhg_number in range(1, max_nhg_entry + 1):
nexthop_group = sai_thrift_create_next_hop_group(
self.client, type=SAI_NEXT_HOP_GROUP_TYPE_ECMP)
self.assertNotEqual(nexthop_group, SAI_NULL_OBJECT_ID)
nhg.append(nexthop_group)
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
self.assertEqual(attr["available_next_hop_group_entry"],
max_nhg_entry - nhg_number)
self.assertEqual(attr["available_next_hop_group_entry"], 0)
# try to create one more nexthop group - should not be possible:
try:
nexthop_group = sai_thrift_create_next_hop_group(
self.client, type=SAI_NEXT_HOP_GROUP_TYPE_ECMP)
self.assertEqual(nexthop_group, SAI_NULL_OBJECT_ID)
print("No more nexthop group may be created")
except AssertionError:
sai_thrift_remove_next_hop_group(self.client, nexthop_group)
self.fail("Number of available nexthop groups "
"may be exceeded")
finally:
for nhg_id in nhg:
sai_thrift_remove_next_hop_group(self.client, nhg_id)
def runTest(self):
self.availableNexthopGroupEntryTest()
class AvailableNexthopGroupMemberEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of nexthop group member entries.
"""
def availableNexthopGroupMemberEntryTest(self):
print("\navailableNexthopGroupMemberEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
max_nhg_entry = attr["available_next_hop_group_entry"]
print("Available nexthop group entries: %d" % max_nhg_entry)
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_member_entry=True)
max_member_entry = attr["available_next_hop_group_member_entry"]
print("Available nexthop group member entries: %d" % max_member_entry)
max_member_per_group = 64
nhg = []
nhop = dict()
members = []
member_number = 0
group_number = 0
ip_add = generate_ip_addr(max_member_entry + 100)
try:
while member_number < max_member_entry and \
group_number < max_nhg_entry:
nexthop_group = sai_thrift_create_next_hop_group(
self.client, type=SAI_NEXT_HOP_GROUP_TYPE_ECMP)
self.assertNotEqual(nexthop_group, SAI_NULL_OBJECT_ID)
nhg.append(nexthop_group)
group_number += 1
nhop_per_group_number = 0
while nhop_per_group_number < max_member_per_group:
ip_p = sai_ipaddress(next(ip_add))
if str(ip_p) in nhop:
continue
nexthop = sai_thrift_create_next_hop(
self.client,
ip=ip_p,
router_interface_id=self.port10_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nexthop, SAI_NULL_OBJECT_ID)
nhop.update({str(ip_p): nexthop})
nhop_per_group_number += 1
nhop_member = sai_thrift_create_next_hop_group_member(
self.client,
next_hop_group_id=nexthop_group,
next_hop_id=nexthop)
self.assertNotEqual(nhop_member, SAI_NULL_OBJECT_ID)
members.append(nhop_member)
member_number += 1
attr = sai_thrift_get_switch_attribute(
self.client,
available_next_hop_group_member_entry=True)
self.assertEqual(
attr["available_next_hop_group_member_entry"],
max_member_entry - member_number)
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
self.assertEqual(attr["available_next_hop_group_entry"], 0)
finally:
for member_id in members:
sai_thrift_remove_next_hop_group_member(self.client, member_id)
for ip_p in nhop:
sai_thrift_remove_next_hop(self.client, nhop.get(ip_p))
for nhg_id in nhg:
sai_thrift_remove_next_hop_group(self.client, nhg_id)
def runTest(self):
self.availableNexthopGroupMemberEntryTest()
class AvailableAclTableTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of acl tables.
"""
def runTest(self):
print("\navailableAclTableTest()")
acl_resource = sai_thrift_acl_resource_t(
stage=SAI_SWITCH_ATTR_ACL_STAGE_INGRESS)
attr = sai_thrift_get_switch_attribute(
self.client, available_acl_table=acl_resource)
available_acl_tables = attr["available_acl_table"]
resource_list = available_acl_tables.resourcelist
try:
acl_table_list_list = []
for resource in resource_list:
stage = resource.stage
bind_point = resource.bind_point
avail_num = resource.avail_num
print("Available tables on stage %d, bind_point %d: %d"
% (stage, bind_point, avail_num))
acl_table_list = []
for counter in range(1, avail_num + 1):
acl_table = sai_thrift_create_acl_table(
self.client,
acl_stage=stage,
acl_bind_point_type_list=sai_thrift_s32_list_t(
count=1, int32list=[bind_point]))
self.assertEqual(self.status(), SAI_STATUS_SUCCESS)
acl_table_list.append(acl_table)
# check remained entries
attr = sai_thrift_get_switch_attribute(
self.client, available_acl_table=acl_resource)
for res in attr["available_acl_table"].resourcelist:
if res.stage == stage and res.bind_point == bind_point:
# Acl table counter wasn't right on Broadcom
self.assertEqual(res.avail_num, avail_num - counter)
break
# try to create one more table - should not be possible
try:
acl_table = sai_thrift_create_acl_table(
self.client,
acl_stage=stage,
acl_bind_point_type_list=sai_thrift_s32_list_t(
count=1, int32list=[bind_point]))
self.assertEqual(acl_table, SAI_NULL_OBJECT_ID)
except AssertionError:
sai_thrift_remove_acl_table(self.client, acl_table)
self.fail("Number of available ACL table entries "
"may be exceeded")
print("Required number of ACL tables created")
acl_table_list_list.append(acl_table_list)
finally:
for acl_table_list in acl_table_list_list:
for acl_table in acl_table_list:
sai_thrift_remove_acl_table(self.client, acl_table)
class AvailableAclTableGroupTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of acl table groups.
"""
def runTest(self):
print("\AvailableAclTableGroupTest()")
print("Get max_number_of_temp_sensors")
temp_list=sai_thrift_s32_list_t(
count=100, int32list=[])
attr = sai_thrift_get_switch_attribute(
self.client, max_number_of_temp_sensors=True,
temp_list=temp_list)
print("Get max_number_of_temp_sensors: {}, temp_list:{}".format(
attr['max_number_of_temp_sensors'], attr['temp_list'].int32list))
acl_resource = sai_thrift_acl_resource_t(
stage=SAI_SWITCH_ATTR_ACL_STAGE_INGRESS)
attr = sai_thrift_get_switch_attribute(
self.client, available_acl_table_group=acl_resource)
available_acl_table_groups = attr["available_acl_table_group"]
resource_list = available_acl_table_groups.resourcelist
try:
acl_table_group_list_list = []
for resource in resource_list:
stage = resource.stage
bind_point = resource.bind_point
avail_num = resource.avail_num
print("Available tables on stage %d, bind_point %d: %d"
% (stage, bind_point, avail_num))
acl_table_group_list = []
for counter in range(1, avail_num + 1):
acl_table_group = sai_thrift_create_acl_table_group(
self.client,
acl_stage=stage,
acl_bind_point_type_list=sai_thrift_s32_list_t(
count=1, int32list=[bind_point]))
self.assertEqual(self.status(), SAI_STATUS_SUCCESS)
acl_table_group_list.append(acl_table_group)
# check remained entries
attr = sai_thrift_get_switch_attribute(
self.client, available_acl_table_group=acl_resource)
for res in attr["available_acl_table_group"].resourcelist:
if res.stage == stage and res.bind_point == bind_point:
# Acl table counter wasn't right on Broadcom
self.assertEqual(res.avail_num, avail_num - counter)
break
# try to create one more table group- should not be possible
try:
acl_table_group = sai_thrift_create_acl_table_group(
self.client,
acl_stage=stage,
acl_bind_point_type_list=sai_thrift_s32_list_t(
count=1, int32list=[bind_point]))
self.assertEqual(acl_table_group, SAI_NULL_OBJECT_ID)
except AssertionError:
sai_thrift_remove_acl_table_group(self.client, acl_table_group)
self.fail("Number of available ACL table group entries "
"may be exceeded")
print("Required number of ACL tables created")
acl_table_group_list_list.append(acl_table_group_list)
finally:
for acl_table_group_list in acl_table_group_list_list:
for acl_table_group in acl_table_group_list:
sai_thrift_remove_acl_table_group(self.client, acl_table_group)
class AvailableIPv4NexthopEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of IPv4 nexthop entries.
"""
def runTest(self):
print("\navailableIPv4NexthopEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_nexthop_entry=True)
max_nhop_entry = attr["available_ipv4_nexthop_entry"]
print("Available IPv4 nexthop entries: %d" % max_nhop_entry)
nbrs = dict()
nhop = dict()
ip_add = generate_ip_addr(max_nhop_entry + 100)
try:
nhop_number = 0
while nhop_number < max_nhop_entry:
ip_p = sai_ipaddress(next(ip_add))
if str(ip_p) in nhop:
continue
nbr_entry = sai_thrift_neighbor_entry_t(
self.switch_id,
self.port10_rif,
ip_p)
status = sai_thrift_create_neighbor_entry(
self.client,
nbr_entry,
dst_mac_address='00:11:22:33:44:55')
self.assertEqual(status, SAI_STATUS_SUCCESS)
nexthop = sai_thrift_create_next_hop(
self.client,
ip=ip_p,
router_interface_id=self.port10_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nexthop, SAI_NULL_OBJECT_ID)
nbrs.update({str(ip_p): nbr_entry})
nhop.update({str(ip_p): nexthop})
nhop_number += 1
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_nexthop_entry=True)
self.assertEqual(attr["available_ipv4_nexthop_entry"],
max_nhop_entry - nhop_number)
self.assertEqual(attr["available_ipv4_nexthop_entry"], 0)
finally:
for ip_p in nhop:
sai_thrift_remove_next_hop(self.client, nhop.get(ip_p))
sai_thrift_remove_neighbor_entry(self.client, nbrs.get(ip_p))
class AvailableIPv6NexthopEntryTest(PlatformSaiHelper):
"""
Verifies creation of maximum number of IPv6 nexthop entries.
"""
def runTest(self):
print("\navailableIPv6NexthopEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_nexthop_entry=True)
max_nhop_entry = attr["available_ipv6_nexthop_entry"]
print("Available IPv6 nexthop entries: %d" % max_nhop_entry)
nbrs = dict()
nhop = dict()
ip_add = generate_ip_addr(max_nhop_entry + 100, ipv6=True)
try:
nhop_number = 0
while nhop_number < max_nhop_entry:
ip_p = sai_ipaddress(next(ip_add))
if str(ip_p) in nhop:
continue
nbr_entry = sai_thrift_neighbor_entry_t(
self.switch_id, self.port10_rif, ip_p)
status = sai_thrift_create_neighbor_entry(
self.client, nbr_entry, dst_mac_address='00:11:22:33:44:55')
nexthop = sai_thrift_create_next_hop(
self.client,
ip=ip_p,
router_interface_id=self.port10_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nexthop, SAI_NULL_OBJECT_ID)
nbrs.update({str(ip_p): nbr_entry})
nhop.update({str(ip_p): nexthop})
nhop_number += 1
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_nexthop_entry=True)
self.assertEqual(attr["available_ipv6_nexthop_entry"],
max_nhop_entry - nhop_number)
self.assertEqual(attr["available_ipv6_nexthop_entry"], 0)
finally:
for ip_p in nhop:
sai_thrift_remove_next_hop(self.client, nhop.get(ip_p))
sai_thrift_remove_neighbor_entry(self.client, nbrs.get(ip_p))
@group("draft")
class SwitchAttrTest(SaiHelper):
"""
Switch attributes tests
"""
def runTest(self):
self.availableNexthopGroupEntryTest()
self.availableNexthopGroupMemberEntryTest()
self.availableFdbEntryTest()
self.readOnlyAttributesTest()
self.refreshIntervalTest()
self.availableSnatEntryTest()
self.availableDnatEntryTest()
def availableNexthopGroupEntryTest(self):
'''
Verifies creation of maximum number of nexthop group entries.
'''
print("\navailableNexthopGroupEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
max_nhg_entry = attr["available_next_hop_group_entry"]
print("Available nexthop group entries: %d" % max_nhg_entry)
nhg = []
try:
for nhg_number in range(1, max_nhg_entry + 1):
nexthop_group = sai_thrift_create_next_hop_group(
self.client, type=SAI_NEXT_HOP_GROUP_TYPE_ECMP)
self.assertNotEqual(nexthop_group, SAI_NULL_OBJECT_ID)
nhg.append(nexthop_group)
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
self.assertEqual(attr["available_next_hop_group_entry"],
max_nhg_entry - nhg_number)
self.assertEqual(attr["available_next_hop_group_entry"], 0)
# try to create one more nexthop group - should not be possible:
try:
nexthop_group = sai_thrift_create_next_hop_group(
self.client, type=SAI_NEXT_HOP_GROUP_TYPE_ECMP)
self.assertEqual(nexthop_group, SAI_NULL_OBJECT_ID)
print("No more nexthop group may be created")
except AssertionError:
sai_thrift_remove_next_hop(self.client, nexthop_group)
self.fail("Number of available nexthop groups "
"may be exceeded")
finally:
for nhg_id in nhg:
sai_thrift_remove_next_hop_group(self.client, nhg_id)
def availableNexthopGroupMemberEntryTest(self):
'''
Verifies creation of maximum number of nexthop group member entries.
'''
print("\navailableNexthopGroupMemberEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
max_nhg_entry = attr["available_next_hop_group_entry"]
print("Available nexthop group entries: %d" % max_nhg_entry)
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_member_entry=True)
max_member_entry = attr["available_next_hop_group_member_entry"]
print("Available nexthop group member entries: %d" % max_member_entry)
max_member_per_group = 64
nhg = []
nhop = dict()
members = []
member_number = 0
group_number = 0
ip_add = generate_ip_addr(max_member_entry + 100)
try:
while member_number < max_member_entry and \
group_number < max_nhg_entry:
nexthop_group = sai_thrift_create_next_hop_group(
self.client, type=SAI_NEXT_HOP_GROUP_TYPE_ECMP)
self.assertNotEqual(nexthop_group, SAI_NULL_OBJECT_ID)
nhg.append(nexthop_group)
group_number += 1
nhop_per_group_number = 0
while nhop_per_group_number < max_member_per_group:
ip_p = sai_ipaddress(next(ip_add))
if str(ip_p) in nhop:
continue
nexthop = sai_thrift_create_next_hop(
self.client,
ip=ip_p,
router_interface_id=self.port10_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nexthop, SAI_NULL_OBJECT_ID)
nhop.update({str(ip_p): nexthop})
nhop_per_group_number += 1
nhop_member = sai_thrift_create_next_hop_group_member(
self.client,
next_hop_group_id=nexthop_group,
next_hop_id=nexthop)
self.assertNotEqual(nhop_member, SAI_NULL_OBJECT_ID)
members.append(nhop_member)
member_number += 1
attr = sai_thrift_get_switch_attribute(
self.client,
available_next_hop_group_member_entry=True)
self.assertEqual(
attr["available_next_hop_group_member_entry"],
max_member_entry - member_number)
attr = sai_thrift_get_switch_attribute(
self.client, available_next_hop_group_entry=True)
self.assertEqual(attr["available_next_hop_group_entry"], 0)
finally:
for member_id in members:
sai_thrift_remove_next_hop_group_member(self.client, member_id)
for ip_p in nhop:
sai_thrift_remove_next_hop(self.client, nhop.get(ip_p))
for nhg_id in nhg:
sai_thrift_remove_next_hop_group(self.client, nhg_id)
def availableFdbEntryTest(self):
'''
Verifies creation of maximum number of FDB entries.
'''
print("\navailableFdbEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_fdb_entry=True)
max_fdb_entry = attr["available_fdb_entry"]
print("Available FDB entries: %d" % max_fdb_entry)
# Verifying only up to 90% of FDB table capacity
available_fdb_entry = int(max_fdb_entry * 0.9)
mac_list = generate_mac_list(max_fdb_entry)
fdb = []
try:
for fdb_number in range(1, available_fdb_entry + 1):
fdb_entry = sai_thrift_fdb_entry_t(
switch_id=self.switch_id,
mac_address=mac_list[fdb_number - 1],
bv_id=self.vlan10)
status = sai_thrift_create_fdb_entry(
self.client,
fdb_entry,
type=SAI_FDB_ENTRY_TYPE_STATIC,
bridge_port_id=self.port0_bp,
packet_action=SAI_PACKET_ACTION_FORWARD)
self.assertEqual(status, SAI_STATUS_SUCCESS)
fdb.append(fdb_entry)
attr = sai_thrift_get_switch_attribute(
self.client, available_fdb_entry=True)
self.assertEqual(attr["available_fdb_entry"],
max_fdb_entry - fdb_number)
finally:
for fdb_id in fdb:
sai_thrift_remove_fdb_entry(self.client, fdb_id)
def readOnlyAttributesTest(self):
'''
Verifies get on read only attributes.
'''
print("\nreadOnlyAttributesTest()")
attr = sai_thrift_get_switch_attribute(self.client,
number_of_active_ports=True)
print(attr)
self.assertNotEqual(attr["number_of_active_ports"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_ACTIVE_PORTS"], 0)
active_ports = attr["number_of_active_ports"]
attr = sai_thrift_get_switch_attribute(
self.client, max_number_of_supported_ports=True)
print(attr)
self.assertNotEqual(attr["max_number_of_supported_ports"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_MAX_NUMBER_OF_SUPPORTED_PORTS"], 0)
attr = sai_thrift_get_switch_attribute(
self.client,
port_list=sai_thrift_object_list_t(idlist=[], count=active_ports))
print(attr)
self.assertNotEqual(attr["port_list"].count, 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_PORT_LIST"].count, 0)
attr = sai_thrift_get_switch_attribute(self.client, port_max_mtu=True)
print(attr)
self.assertNotEqual(attr["port_max_mtu"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_PORT_MAX_MTU"], 0)
attr = sai_thrift_get_switch_attribute(self.client, cpu_port=True)
print(attr)
self.assertNotEqual(attr["cpu_port"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_CPU_PORT"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
max_virtual_routers=True)
print(attr)
self.assertNotEqual(attr["max_virtual_routers"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_VIRTUAL_ROUTERS"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
fdb_table_size=True)
print(attr)
self.assertNotEqual(attr["fdb_table_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_FDB_TABLE_SIZE"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
l3_neighbor_table_size=True)
print(attr)
self.assertNotEqual(attr["l3_neighbor_table_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_L3_NEIGHBOR_TABLE_SIZE"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
l3_route_table_size=True)
print(attr)
self.assertNotEqual(attr["l3_route_table_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_L3_ROUTE_TABLE_SIZE"], 0)
attr = sai_thrift_get_switch_attribute(self.client, lag_members=True)
print(attr)
self.assertNotEqual(attr["lag_members"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_LAG_MEMBERS"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
number_of_lags=True)
print(attr)
self.assertNotEqual(attr["number_of_lags"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_LAGS"], 0)
attr = sai_thrift_get_switch_attribute(self.client, ecmp_members=True)
print(attr)
self.assertNotEqual(attr["ecmp_members"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ECMP_MEMBERS"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
number_of_ecmp_groups=True)
print(attr)
self.assertNotEqual(attr["number_of_ecmp_groups"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_ECMP_GROUPS"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
number_of_unicast_queues=True)
print(attr)
self.assertNotEqual(attr["number_of_unicast_queues"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_NUMBER_OF_UNICAST_QUEUES"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
number_of_multicast_queues=True)
print(attr)
self.assertNotEqual(attr["number_of_multicast_queues"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_NUMBER_OF_MULTICAST_QUEUES"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
number_of_queues=True)
print(attr)
self.assertNotEqual(attr["number_of_queues"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_QUEUES"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
number_of_cpu_queues=True)
print(attr)
self.assertNotEqual(attr["number_of_cpu_queues"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_NUMBER_OF_CPU_QUEUES"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
acl_table_minimum_priority=True)
print(attr)
self.assertEqual(attr["acl_table_minimum_priority"], 0)
self.assertEqual(
attr["SAI_SWITCH_ATTR_ACL_TABLE_MINIMUM_PRIORITY"], 0)
attr = sai_thrift_get_switch_attribute(
self.client, acl_table_maximum_priority=True)
print(attr)
self.assertNotEqual(attr["acl_table_maximum_priority"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_ACL_TABLE_MAXIMUM_PRIORITY"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
acl_entry_minimum_priority=True)
print(attr)
self.assertEqual(attr["acl_entry_minimum_priority"], 0)
self.assertEqual(attr["SAI_SWITCH_ATTR_ACL_ENTRY_MINIMUM_PRIORITY"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
acl_entry_maximum_priority=True)
print(attr)
self.assertNotEqual(attr["acl_entry_maximum_priority"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_ACL_ENTRY_MAXIMUM_PRIORITY"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
default_vlan_id=True)
print(attr)
self.assertNotEqual(attr["default_vlan_id"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_DEFAULT_VLAN_ID"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
default_stp_inst_id=True)
print(attr)
self.assertEqual(attr["default_stp_inst_id"], SAI_NULL_OBJECT_ID)
self.assertEqual(
attr["SAI_SWITCH_ATTR_DEFAULT_STP_INST_ID"], SAI_NULL_OBJECT_ID)
attr = sai_thrift_get_switch_attribute(self.client,
max_stp_instance=True)
print(attr)
self.assertNotEqual(attr["max_stp_instance"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_STP_INSTANCE"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
default_virtual_router_id=True)
print(attr)
self.assertNotEqual(attr["default_virtual_router_id"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_DEFAULT_VIRTUAL_ROUTER_ID"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
default_1q_bridge_id=True)
print(attr)
self.assertNotEqual(attr["default_1q_bridge_id"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_DEFAULT_1Q_BRIDGE_ID"], 0)
attr = sai_thrift_get_switch_attribute(
self.client, qos_max_number_of_traffic_classes=True)
print(attr)
self.assertNotEqual(attr["qos_max_number_of_traffic_classes"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_TRAFFIC_CLASSES"], 0)
attr_name = "SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_SCHEDULER_GROUP_" + \
"HIERARCHY_LEVELS"
attr = sai_thrift_get_switch_attribute(
self.client,
qos_max_number_of_scheduler_group_hierarchy_levels=True)
print(attr)
self.assertNotEqual(
attr["qos_max_number_of_scheduler_group_hierarchy_levels"], 0)
self.assertNotEqual(attr[attr_name], 0)
scheduler_group_levels = attr[
"qos_max_number_of_scheduler_group_hierarchy_levels"]
value = sai_thrift_u32_list_t(count=scheduler_group_levels,
uint32list=[])
attr_name = "SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_SCHEDULER_GROUPS_" + \
"PER_HIERARCHY_LEVEL"
attr = sai_thrift_get_switch_attribute(
self.client,
qos_max_number_of_scheduler_groups_per_hierarchy_level=value)
print(attr)
self.assertNotEqual(
attr["qos_max_number_of_scheduler_groups_per_hierarchy_level"], 0)
self.assertNotEqual(attr[attr_name], 0)
attr_name = "SAI_SWITCH_ATTR_QOS_MAX_NUMBER_OF_CHILDS_" + \
"PER_SCHEDULER_GROUP"
attr = sai_thrift_get_switch_attribute(
self.client, qos_max_number_of_childs_per_scheduler_group=True)
print(attr)
self.assertNotEqual(
attr["qos_max_number_of_childs_per_scheduler_group"], 0)
self.assertNotEqual(attr[attr_name], 0)
attr = sai_thrift_get_switch_attribute(self.client,
total_buffer_size=True)
print(attr)
self.assertNotEqual(attr["total_buffer_size"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_TOTAL_BUFFER_SIZE"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
ingress_buffer_pool_num=True)
print(attr)
self.assertNotEqual(attr["ingress_buffer_pool_num"], 0)
self.assertNotEqual(
attr["SAI_SWITCH_ATTR_INGRESS_BUFFER_POOL_NUM"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
egress_buffer_pool_num=True)
print(attr)
self.assertNotEqual(attr["egress_buffer_pool_num"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_EGRESS_BUFFER_POOL_NUM"], 0)
not supported
attr = sai_thrift_get_switch_attribute(self.client, ecmp_hash=True)
print(attr)
self.assertNotEqual(attr["ecmp_hash"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ECMP_HASH"], 0)
attr = sai_thrift_get_switch_attribute(self.client, lag_hash=True)
print(attr)
self.assertNotEqual(attr["lag_hash"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_LAG_HASH"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
max_acl_action_count=True)
print(attr)
self.assertNotEqual(attr["max_acl_action_count"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_ACL_ACTION_COUNT"], 0)
max_acl_action_count = attr["max_acl_action_count"]
attr = sai_thrift_get_switch_attribute(self.client,
max_acl_range_count=True)
print(attr)
self.assertNotEqual(attr["max_acl_range_count"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_ACL_RANGE_COUNT"], 0)
s32 = sai_thrift_s32_list_t(int32list=[], count=max_acl_action_count)
cap = sai_thrift_acl_capability_t(action_list=s32)
attr = sai_thrift_get_switch_attribute(self.client,
acl_capability=cap)
print(attr)
self.assertNotEqual(attr["acl_capability"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ACL_CAPABILITY"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
max_mirror_session=True)
print(attr)
self.assertNotEqual(attr["max_mirror_session"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_MAX_MIRROR_SESSION"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
default_trap_group=True)
print(attr)
self.assertNotEqual(attr["default_trap_group"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_DEFAULT_TRAP_GROUP"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
acl_stage_ingress=cap)
print(attr)
self.assertNotEqual(attr["acl_stage_ingress"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ACL_STAGE_INGRESS"], 0)
attr = sai_thrift_get_switch_attribute(self.client,
acl_stage_egress=cap)
print(attr)
self.assertNotEqual(attr["acl_stage_egress"], 0)
self.assertNotEqual(attr["SAI_SWITCH_ATTR_ACL_STAGE_EGRESS"], 0)
def refreshIntervalTest(self):
'''
Verifies SAI_SWITCH_ATTR_COUNTER_REFRESH_INTERVAL switch attribute
applied to VLAN and RIF stats
'''
print("\nrefreshIntervalTest()")
attr = sai_thrift_get_switch_attribute(
self.client, counter_refresh_interval=True)
init_interval = attr["counter_refresh_interval"]
print("Counters refresh interval initially set to %d sec"
% init_interval)
test_vlan = self.vlan10
test_vlan_port = self.dev_port0
test_rif = self.port10_rif
test_rif_port = self.dev_port10
test_interval = 10
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
init_vlan_counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
init_rif_counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
pkt = simple_udp_packet()
try:
print("\nTesting VLAN stats counters refresh time")
# compensate refresh time shift
send_packet(self, test_vlan_port, pkt)
while query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)[
'SAI_VLAN_STAT_IN_PACKETS'] == init_vlan_counter:
time.sleep(0.1)
init_vlan_counter += 1
send_packet(self, test_vlan_port, pkt)
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_vlan_counter:
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
timer_end = time.time()
init_vlan_counter += 1
interval = int(round(timer_end - timer_start))
print("VLAN stats refreshed after %d sec" % interval)
self.assertEqual(init_interval, int(round(interval)))
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
print("Setting refresh interval to %d sec" % test_interval)
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=test_interval)
attr = sai_thrift_get_switch_attribute(
self.client, counter_refresh_interval=True)
set_interval = attr["counter_refresh_interval"]
print("Refresh interval set to %d sec" % set_interval)
self.assertEqual(set_interval, test_interval)
# compensate refresh time shift
send_packet(self, test_vlan_port, pkt)
while query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)[
'SAI_VLAN_STAT_IN_PACKETS'] == init_vlan_counter:
time.sleep(0.1)
init_vlan_counter += 1
send_packet(self, test_vlan_port, pkt)
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_vlan_counter:
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
timer_end = time.time()
init_vlan_counter += 1
interval = int(round(timer_end - timer_start))
print("VLAN stats refreshed after %d sec" % interval)
self.assertEqual(test_interval, interval)
vlan_stats = query_counter(
self, sai_thrift_get_vlan_stats, test_vlan)
counter = vlan_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
finally:
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=init_interval)
try:
print("\nTesting RIF stats counters refresh time")
# compensate refresh time shift
send_packet(self, test_rif_port, pkt)
while query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)[
'SAI_ROUTER_INTERFACE_STAT_IN_PACKETS'] == \
init_rif_counter:
time.sleep(0.1)
init_rif_counter += 1
send_packet(self, test_rif_port, pkt)
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
self.assertEqual(counter, init_rif_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_rif_counter:
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
timer_end = time.time()
init_rif_counter += 1
interval = int(round(timer_end - timer_start))
print("RIF stats refreshed after %d sec" % interval)
self.assertEqual(init_interval, int(round(interval)))
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
self.assertEqual(counter, init_rif_counter)
print("Setting refresh interval to %d sec" % test_interval)
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=test_interval)
attr = sai_thrift_get_switch_attribute(
self.client, counter_refresh_interval=True)
set_interval = attr["counter_refresh_interval"]
print("Refresh interval set to %d sec" % set_interval)
self.assertEqual(set_interval, test_interval)
# compensate refresh time shift
send_packet(self, test_rif_port, pkt)
while query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)[
'SAI_ROUTER_INTERFACE_STAT_IN_PACKETS'] == \
init_rif_counter:
time.sleep(0.1)
init_rif_counter += 1
send_packet(self, test_rif_port, pkt)
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
self.assertEqual(counter, init_rif_counter)
# determine refresh time
timer_start = time.time()
timer_end = time.time()
while counter == init_rif_counter:
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_ROUTER_INTERFACE_STAT_IN_PACKETS']
timer_end = time.time()
init_rif_counter += 1
interval = int(round(timer_end - timer_start))
print("RIF stats refreshed after %d sec" % interval)
self.assertEqual(test_interval, interval)
rif_stats = query_counter(
self, sai_thrift_get_router_interface_stats, test_rif)
counter = rif_stats['SAI_VLAN_STAT_IN_PACKETS']
self.assertEqual(counter, init_vlan_counter)
finally:
sai_thrift_set_switch_attribute(
self.client, counter_refresh_interval=init_interval)
def availableSnatEntryTest(self):
'''
Verifies creation of maximum number of snat entries.
'''
print("\navailableSnatEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_snat_entry=True)
max_snat_entry = attr["available_snat_entry"]
print("Available SNAT entries: %d" % max_snat_entry)
snat_list = []
addr = generate_ip_addr(max_snat_entry + 1)
try:
for snat_number in range(1, max_snat_entry + 1):
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=next(addr), proto=6),
mask=sai_thrift_nat_entry_mask_t(
src_ip='255.255.255.255', proto=63))
snat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
status = sai_thrift_create_nat_entry(
self.client, snat, nat_type=SAI_NAT_TYPE_SOURCE_NAT)
self.assertEqual(status, SAI_STATUS_SUCCESS)
snat_list.append(snat)
attr = sai_thrift_get_switch_attribute(
self.client, available_snat_entry=True)
self.assertEqual(attr["available_snat_entry"],
max_snat_entry - snat_number)
# checking if no more SNAT entry may be created
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
src_ip=next(addr),
proto=6),
mask=sai_thrift_nat_entry_mask_t(
src_ip='255.255.255.255',
proto=63))
try:
snat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
stat = sai_thrift_create_nat_entry(
self.client, snat,
nat_type=SAI_NAT_TYPE_SOURCE_NAT)
self.assertNotEqual(stat, SAI_STATUS_SUCCESS)
finally:
if not stat:
sai_thrift_remove_nat_entry(self.client, snat)
finally:
for snat in snat_list:
sai_thrift_remove_nat_entry(self.client, snat)
def availableDnatEntryTest(self):
'''
Verifies creation of maximum number of dnat entries.
'''
print("\navailableDnatEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_dnat_entry=True)
max_dnat_entry = attr["available_dnat_entry"]
print("Available DNAT entries: %d" % max_dnat_entry)
dnat_list = []
addr = generate_ip_addr(max_dnat_entry + 1)
try:
for dnat_number in range(1, max_dnat_entry + 1):
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=next(addr),
proto=6,
l4_src_port=100),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255', proto=63, l4_src_port=255))
dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
status = sai_thrift_create_nat_entry(
self.client, dnat, nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
self.assertEqual(status, SAI_STATUS_SUCCESS)
dnat_list.append(dnat)
attr = sai_thrift_get_switch_attribute(
self.client, available_dnat_entry=True)
self.assertEqual(attr["available_dnat_entry"],
max_dnat_entry - dnat_number)
# checking if no more DNAT entry may be created
nat_data = sai_thrift_nat_entry_data_t(
key=sai_thrift_nat_entry_key_t(
dst_ip=next(addr),
proto=6),
mask=sai_thrift_nat_entry_mask_t(
dst_ip='255.255.255.255',
proto=63))
try:
dnat = sai_thrift_nat_entry_t(
vr_id=self.default_vrf,
data=nat_data,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
stat = sai_thrift_create_nat_entry(
self.client, dnat,
nat_type=SAI_NAT_TYPE_DESTINATION_NAT)
self.assertNotEqual(stat, SAI_STATUS_SUCCESS)
finally:
if not stat:
sai_thrift_remove_nat_entry(self.client, dnat)
finally:
for dnat in dnat_list:
sai_thrift_remove_nat_entry(self.client, dnat)
@group("draft")
class SwitchAttrSimplifiedTest(SaiHelperSimplified):
"""
Switch attributes tests adapted for DASH
Configuration
+----------+-----------+
| port0 | port0_rif |
+----------+-----------+
"""
def setUp(self):
super(SwitchAttrSimplifiedTest, self).setUp()
self.create_routing_interfaces(ports=[0])
# values required by neighbor entries tests set by route entries test
# Note: routes entries tests should be run as first
self.available_v4_host_routes = None
self.available_v6_host_routes = None
def tearDown(self):
self.destroy_routing_interfaces()
super(SwitchAttrSimplifiedTest, self).tearDown()
def runTest(self):
self.availableIPv4RouteEntryTest()
self.availableIPv6RouteEntryTest()
self.availableIPv4NexthopEntryTest()
self.availableIPv6NexthopEntryTest()
self.availableIPv4NeighborEntryTest()
self.availableIPv6NeighborEntryTest()
def availableIPv4RouteEntryTest(self):
'''
Verifies creation of maximum number of IPv4 route entries.
'''
print("\navailableIPv4RouteEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_route_entry=True)
max_route_entry = attr["available_ipv4_route_entry"]
print("Available IPv4 route entries: %d" % max_route_entry)
routes = dict()
mask = '/32'
ip_add = generate_ip_addr(max_route_entry + 100)
try:
nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress('10.10.10.1'),
router_interface_id=self.port0_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nhop, SAI_NULL_OBJECT_ID)
route_number = 0
max_host_route = 0
while route_number < max_route_entry:
ip_p_m = sai_ipprefix(next(ip_add) + mask)
# check if ip repeat, then get next ip
if str(ip_p_m) in routes:
continue
route_entry = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=ip_p_m)
status = sai_thrift_create_route_entry(
self.client, route_entry, next_hop_id=nhop)
if status == SAI_STATUS_SUCCESS:
routes.update({str(ip_p_m): route_entry})
route_number += 1
elif status == SAI_STATUS_ITEM_ALREADY_EXISTS:
continue
elif mask == '/32': # when host table is full change to LPM
print("%s host routes have been created" % route_number)
max_host_route = route_number
self.available_v4_host_routes = max_host_route
mask = '/30'
continue
else:
self.fail("Route creation failed after creating %d "
"entries, status %u" % (route_number, status))
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_route_entry=True)
self.assertEqual(attr["available_ipv4_route_entry"],
max_route_entry - route_number)
print("%s LPM routes have been created"
% (max_route_entry - max_host_route))
self.assertEqual(attr["available_ipv4_route_entry"], 0)
ip_add.close()
finally:
for ip_p_m in routes:
sai_thrift_remove_route_entry(self.client, routes.get(ip_p_m))
sai_thrift_remove_next_hop(self.client, nhop)
def availableIPv4NexthopEntryTest(self):
'''
Verifies creation of maximum number of IPv4 nexthop entries.
'''
print("\navailableIPv4NexthopEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_nexthop_entry=True)
max_nhop_entry = attr["available_ipv4_nexthop_entry"]
print("Available IPv4 nexthop entries: %d" % max_nhop_entry)
nhop = dict()
ip_add = generate_ip_addr(max_nhop_entry + 100)
try:
nhop_number = 0
while nhop_number < max_nhop_entry:
ip_p = sai_ipaddress(next(ip_add))
if str(ip_p) in nhop:
continue
nexthop = sai_thrift_create_next_hop(
self.client,
ip=ip_p,
router_interface_id=self.port0_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nexthop, SAI_NULL_OBJECT_ID)
nhop.update({str(ip_p): nexthop})
nhop_number += 1
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_nexthop_entry=True)
self.assertEqual(attr["available_ipv4_nexthop_entry"],
max_nhop_entry - nhop_number)
self.assertEqual(attr["available_ipv4_nexthop_entry"], 0)
finally:
for ip_p in nhop:
sai_thrift_remove_next_hop(self.client, nhop.get(ip_p))
def availableIPv4NeighborEntryTest(self):
'''
Verifies creation of maximum number of IPv4 neighbor entries.
'''
print("\navailableIPv4NeighborEntryTest()")
if self.available_v4_host_routes is None:
print("availableIPv4RouteEntryTest must be run first")
return
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_neighbor_entry=True)
available_nbr_entry = attr["available_ipv4_neighbor_entry"]
print("Available IPv4 neighbor entries: %d" % available_nbr_entry)
if available_nbr_entry > self.available_v4_host_routes:
print("Cannot create more neighbor entries than available host "
"routes which is %d" % self.available_v4_host_routes)
max_nbr_entry = self.available_v4_host_routes
else:
max_nbr_entry = available_nbr_entry
nbrs = dict()
ip_add = generate_ip_addr(max_nbr_entry + 100)
try:
nbr_number = 0
while nbr_number < max_nbr_entry:
ip_p = sai_ipaddress(next(ip_add))
# check if ip repeat, then get next ip
if str(ip_p) in nbrs:
continue
nbr_entry = sai_thrift_neighbor_entry_t(
rif_id=self.port0_rif,
ip_address=ip_p)
status = sai_thrift_create_neighbor_entry(
self.client,
nbr_entry,
dst_mac_address='00:00:00:00:00:01',
no_host_route=False)
self.assertEqual(status, SAI_STATUS_SUCCESS)
nbrs.update({str(ip_p): nbr_entry})
nbr_number += 1
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv4_neighbor_entry=True)
self.assertEqual(attr["available_ipv4_neighbor_entry"],
available_nbr_entry - nbr_number,
"Failed after %d entries" % nbr_number)
finally:
for ip_p in nbrs:
sai_thrift_remove_neighbor_entry(self.client, nbrs.get(ip_p))
def availableIPv6RouteEntryTest(self):
'''
Verifies creation of maximum number of IPv6 route entries.
'''
print("\navailableIPv6RouteEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_route_entry=True)
max_route_entry = attr["available_ipv6_route_entry"]
print("Available IPv6 route entries: %d" % max_route_entry)
routes = dict()
mask = '/128'
ip_add = generate_ip_addr(max_route_entry + 100, ipv6=True)
try:
nhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress('2001:0db8:1::1'),
router_interface_id=self.port0_rif,
type=SAI_NEXT_HOP_TYPE_IP)
self.assertNotEqual(nhop, SAI_NULL_OBJECT_ID)
route_number = 0
max_host_route = 0
while route_number < max_route_entry:
ip_p_m = sai_ipprefix(next(ip_add) + mask)
# check if ip repeat, then get next ip
if str(ip_p_m) in routes:
continue
route_entry = sai_thrift_route_entry_t(
vr_id=self.default_vrf,
destination=ip_p_m)
status = sai_thrift_create_route_entry(
self.client, route_entry, next_hop_id=nhop)
if status == SAI_STATUS_SUCCESS:
routes.update({str(ip_p_m): route_entry})
route_number += 1
elif status == SAI_STATUS_ITEM_ALREADY_EXISTS:
continue
elif mask == '/128': # when host table is full change to LPM
print("%s host routes have been created" % route_number)
max_host_route = route_number
self.available_v6_host_routes = max_host_route
mask = '/120'
continue
elif mask == '/120': # when LPM table is full change to LPM64
print("%s host + LPM routes have been created so far"
% route_number)
mask = '/64'
continue
else:
self.fail("Route creation failed after creating %d "
"entries, status %u" % (route_number, status))
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_route_entry=True)
self.assertEqual(attr["available_ipv6_route_entry"],
max_route_entry - route_number)
print("%s LPM routes have been created"
% (max_route_entry - max_host_route))
self.assertEqual(attr["available_ipv6_route_entry"], 0)
ip_add.close()
finally:
for ip_p_m in routes:
sai_thrift_remove_route_entry(self.client, routes.get(ip_p_m))
sai_thrift_remove_next_hop(self.client, nhop)
def availableIPv6NexthopEntryTest(self):
'''
Verifies creation of maximum number of IPv6 nexthop entries.
'''
print("\navailableIPv6NexthopEntryTest()")
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_nexthop_entry=True)
max_nhop_entry = attr["available_ipv6_nexthop_entry"]
print("Available IPv6 nexthop entries: %d" % max_nhop_entry)
nhop = dict()
ip_add = generate_ip_addr(max_nhop_entry + 100, ipv6=True)
try:
nhop_number = 0
while nhop_number < max_nhop_entry:
ip_p = sai_ipaddress(next(ip_add))
if str(ip_p) in nhop:
continue
nexthop = sai_thrift_create_next_hop(
self.client,
ip=ip_p,
router_interface_id=self.port0_rif,
type=SAI_NEXT_HOP_TYPE_IP)
nhop.update({str(ip_p): nexthop})
nhop_number += 1
self.assertNotEqual(nexthop, SAI_NULL_OBJECT_ID)
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_nexthop_entry=True)
self.assertEqual(attr["available_ipv6_nexthop_entry"],
max_nhop_entry - nhop_number)
self.assertEqual(attr["available_ipv6_nexthop_entry"], 0)
finally:
for ip_p in nhop:
sai_thrift_remove_next_hop(self.client, nhop.get(ip_p))
def availableIPv6NeighborEntryTest(self):
'''
Verifies creation of maximum number of IPv6 neighbor entries.
'''
print("\navailableIPv6NeighborEntryTest()")
if self.available_v6_host_routes is None:
print("availableIPv6RouteEntryTest must be run first")
return
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_neighbor_entry=True)
available_nbr_entry = attr["available_ipv6_neighbor_entry"]
print("Available IPv6 neighbor entries: %d" % available_nbr_entry)
if available_nbr_entry > self.available_v6_host_routes:
print("Cannot create more neighbor entries than available host "
"routes which is %d" % self.available_v6_host_routes)
max_nbr_entry = self.available_v6_host_routes
else:
max_nbr_entry = available_nbr_entry
nbrs = dict()
ip_add = generate_ip_addr(max_nbr_entry + 100, ipv6=True)
try:
nbr_number = 0
while nbr_number < max_nbr_entry:
ip_p = sai_ipaddress(next(ip_add))
# check if ip repeat, then get next ip
if str(ip_p) in nbrs:
continue
nbr_entry = sai_thrift_neighbor_entry_t(
rif_id=self.port0_rif,
ip_address=ip_p)
status = sai_thrift_create_neighbor_entry(
self.client,
nbr_entry,
dst_mac_address='00:00:00:00:00:01',
no_host_route=False)
nbrs.update({str(ip_p): nbr_entry})
nbr_number += 1
self.assertEqual(status, SAI_STATUS_SUCCESS)
attr = sai_thrift_get_switch_attribute(
self.client, available_ipv6_neighbor_entry=True)
self.assertEqual(attr["available_ipv6_neighbor_entry"],
available_nbr_entry - nbr_number,
"Failed after %d entries" % nbr_number)
finally:
for ip_p in nbrs:
sai_thrift_remove_neighbor_entry(self.client, nbrs.get(ip_p))
@group("draft")
class SwitchAttrAclTest(SaiHelperSimplified):
"""
Switch ACL attributes tests
No additional configuration needed
"""
def runTest(self):
# TODO: test requires additional verification
self.availableAclTableTest()
def availableAclTableTest(self):
'''
Verifies creation of maximum number of acl tables.
'''
print("\navailableAclTableTest()")
acl_resource = sai_thrift_acl_resource_t(
stage=SAI_SWITCH_ATTR_ACL_STAGE_INGRESS)
attr = sai_thrift_get_switch_attribute(
self.client, available_acl_table=acl_resource)
available_acl_tables = attr["available_acl_table"]
resource_list = available_acl_tables.resourcelist
try:
acl_table_list_list = []
for resource in resource_list:
stage = resource.stage
bind_point = resource.bind_point
avail_num = resource.avail_num
print("Available tables on stage %d: %d"
% (stage, avail_num))
acl_table_list = []
for counter in range(1, avail_num + 1):
acl_table = sai_thrift_create_acl_table(
self.client,
acl_stage=stage,
acl_bind_point_type_list=sai_thrift_s32_list_t(
count=1, int32list=[bind_point]))
acl_table_list.append(acl_table)
# check remained entries
attr = sai_thrift_get_switch_attribute(
self.client, available_acl_table=acl_resource)
for res in attr["available_acl_table"].resourcelist:
print(res)
if res.stage == stage and res.bind_point == bind_point:
self.assertEqual(res.avail_num, avail_num - counter)
break
# try to create one more table - should not be possible
try:
acl_table = sai_thrift_create_acl_table(
self.client,
acl_stage=stage,
acl_bind_point_type_list=sai_thrift_s32_list_t(
count=1, int32list=[bind_point]))
self.assertEqual(acl_table, SAI_NULL_OBJECT_ID)
except AssertionError:
sai_thrift_remove_acl_table(self.client, acl_table)
self.fail("Number of available ACL table entries "
"may be exceeded")
print("Required number of ACL tables created")
acl_table_list_list.append(acl_table_list)
finally:
for acl_table_list in acl_table_list_list:
for acl_table in acl_table_list:
sai_thrift_remove_acl_table(self.client, acl_table)
@group("draft")
class SwitchVxlanTest(SaiHelper):
'''
Switch VXLAN attributes tests
'''
def setUp(self):
super(SwitchVxlanTest, self).setUp()
# underlay config
self.uvrf = sai_thrift_create_virtual_router(self.client)
self.vm_ip = '100.100.1.1'
self.customer_ip = '100.100.3.1'
self.vni = 2000
self.lpb_ip = '10.10.10.10'
self.tunnel_ip = '10.10.10.1'
self.inner_dmac = '00:33:33:33:33:33'
self.unbor_mac = '00:11:11:11:11:11'
# overlay config
self.ovrf = sai_thrift_create_virtual_router(self.client)
tunnel_type = SAI_TUNNEL_TYPE_VXLAN
ttl_mode = SAI_TUNNEL_TTL_MODE_PIPE_MODEL
# create underlay loopback rif for tunnel
self.urif_lb = sai_thrift_create_router_interface(
self.client,
type=SAI_ROUTER_INTERFACE_TYPE_LOOPBACK,
virtual_router_id=self.uvrf)
self.orif = sai_thrift_create_router_interface(
self.client,
type=SAI_ROUTER_INTERFACE_TYPE_PORT,
virtual_router_id=self.ovrf,
port_id=self.port24)
# Encap configuration follows
# create Encap/Decap mappers
self.encap_tunnel_map = sai_thrift_create_tunnel_map(
self.client, type=SAI_TUNNEL_MAP_TYPE_VIRTUAL_ROUTER_ID_TO_VNI)
self.decap_tunnel_map = sai_thrift_create_tunnel_map(
self.client, type=SAI_TUNNEL_MAP_TYPE_VNI_TO_VIRTUAL_ROUTER_ID)
# create Encap/Decap mapper entries for self.ovrf
self.encap_tunnel_map_entry = sai_thrift_create_tunnel_map_entry(
self.client,
tunnel_map_type=SAI_TUNNEL_MAP_TYPE_VIRTUAL_ROUTER_ID_TO_VNI,
tunnel_map=self.encap_tunnel_map,
virtual_router_id_key=self.ovrf,
virtual_router_id_value=self.ovrf,
vni_id_key=self.vni,
vni_id_value=self.vni)
self.decap_tunnel_map_entry = sai_thrift_create_tunnel_map_entry(
self.client,
tunnel_map_type=SAI_TUNNEL_MAP_TYPE_VNI_TO_VIRTUAL_ROUTER_ID,
tunnel_map=self.decap_tunnel_map,
virtual_router_id_key=self.ovrf,
virtual_router_id_value=self.ovrf,
vni_id_key=self.vni,
vni_id_value=self.vni)
encap_mappers_objlist = sai_thrift_object_list_t(
count=1, idlist=[self.encap_tunnel_map])
decap_mappers_objlist = sai_thrift_object_list_t(
count=1, idlist=[self.decap_tunnel_map])
self.tunnel = sai_thrift_create_tunnel(
self.client,
type=tunnel_type,
encap_src_ip=sai_ipaddress(self.lpb_ip),
decap_mappers=decap_mappers_objlist,
encap_mappers=encap_mappers_objlist,
encap_ttl_mode=ttl_mode,
decap_ttl_mode=ttl_mode,
underlay_interface=self.urif_lb)
self.urif = sai_thrift_create_router_interface(
self.client,
type=SAI_ROUTER_INTERFACE_TYPE_PORT,
port_id=self.port25,
virtual_router_id=self.uvrf)
self.unbor = sai_thrift_neighbor_entry_t(
rif_id=self.urif, ip_address=sai_ipaddress(self.tunnel_ip))
sai_thrift_create_neighbor_entry(
self.client, self.unbor, dst_mac_address=self.unbor_mac)
self.unhop = sai_thrift_create_next_hop(
self.client,
ip=sai_ipaddress(self.tunnel_ip),
router_interface_id=self.urif,
type=SAI_NEXT_HOP_TYPE_IP)
self.tunnel_route = sai_thrift_route_entry_t(
vr_id=self.uvrf, destination=sai_ipprefix('10.10.10.1/32'))
sai_thrift_create_route_entry(
self.client, self.tunnel_route, next_hop_id=self.unhop)
self.tunnel_nexthop = sai_thrift_create_next_hop(
self.client,
type=SAI_NEXT_HOP_TYPE_TUNNEL_ENCAP,
tunnel_id=self.tunnel,
ip=sai_ipaddress(self.tunnel_ip),
tunnel_vni=self.vni)
self.customer_route = sai_thrift_route_entry_t(
vr_id=self.ovrf, destination=sai_ipprefix(self.vm_ip + '/32'))
sai_thrift_create_route_entry(
self.client, self.customer_route, next_hop_id=self.tunnel_nexthop)
print("Setting default VxLan router MAC to %s" % self.inner_dmac)
sai_thrift_set_switch_attribute(
self.client, vxlan_default_router_mac=self.inner_dmac)
def runTest(self):
self.defaultPortTest()
self.defaultRouterMacTest()
def tearDown(self):
sai_thrift_remove_route_entry(self.client, self.customer_route)
sai_thrift_remove_route_entry(self.client, self.tunnel_route)
sai_thrift_remove_next_hop(self.client, self.tunnel_nexthop)
sai_thrift_remove_next_hop(self.client, self.unhop)
sai_thrift_remove_neighbor_entry(self.client, self.unbor)
sai_thrift_remove_router_interface(self.client, self.urif)
sai_thrift_remove_tunnel(self.client, self.tunnel)
sai_thrift_remove_tunnel_map_entry(self.client,
self.decap_tunnel_map_entry)
sai_thrift_remove_tunnel_map_entry(self.client,
self.encap_tunnel_map_entry)
sai_thrift_remove_tunnel_map(self.client, self.decap_tunnel_map)
sai_thrift_remove_tunnel_map(self.client, self.encap_tunnel_map)
sai_thrift_remove_router_interface(self.client, self.orif)
sai_thrift_remove_router_interface(self.client, self.urif_lb)
sai_thrift_remove_virtual_router(self.client, self.ovrf)
sai_thrift_remove_virtual_router(self.client, self.uvrf)
super(SwitchVxlanTest, self).tearDown()
def defaultPortTest(self):
'''
Verifies SAI_SWITCH_ATTR_VXLAN_DEFAULT_PORT attribute setting
'''
print("\ndefaultPortTest()")
vxlan_port = 4000
try:
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src='00:22:22:22:22:22',
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=64)
inner_pkt = simple_tcp_packet(eth_dst=self.inner_dmac,
eth_src=ROUTER_MAC,
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=63)
vxlan_pkt = Mask(simple_vxlan_packet(eth_src=ROUTER_MAC,
eth_dst=self.unbor_mac,
ip_id=0,
ip_src=self.lpb_ip,
ip_dst=self.tunnel_ip,
ip_ttl=64,
ip_flags=0x2,
with_udp_chksum=False,
vxlan_vni=self.vni,
inner_frame=inner_pkt))
vxlan_pkt.set_do_not_care_scapy(UDP, 'sport')
print("Sending packet with initial VxLan port number")
send_packet(self, self.dev_port24, pkt)
verify_packet(self, vxlan_pkt, self.dev_port25)
print("\tOK")
attr = sai_thrift_get_switch_attribute(self.client,
vxlan_default_port=True)
init_vxlan_port = attr['vxlan_default_port']
print("Setting default VxLan port to %d" % vxlan_port)
sai_thrift_set_switch_attribute(self.client,
vxlan_default_port=vxlan_port)
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src='00:22:22:22:22:22',
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=64)
inner_pkt = simple_tcp_packet(eth_dst=self.inner_dmac,
eth_src=ROUTER_MAC,
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=63)
vxlan_pkt = Mask(simple_vxlan_packet(eth_src=ROUTER_MAC,
eth_dst=self.unbor_mac,
ip_id=0,
udp_dport=vxlan_port,
ip_src=self.lpb_ip,
ip_dst=self.tunnel_ip,
ip_ttl=64,
ip_flags=0x2,
with_udp_chksum=False,
vxlan_vni=self.vni,
inner_frame=inner_pkt))
vxlan_pkt.set_do_not_care_scapy(UDP, 'sport')
print("Sending packet with new VxLan port number")
send_packet(self, self.dev_port24, pkt)
verify_packet(self, vxlan_pkt, self.dev_port25)
print("\tOK")
finally:
sai_thrift_set_switch_attribute(self.client,
vxlan_default_port=init_vxlan_port)
def defaultRouterMacTest(self):
'''
Verifies SAI_SWITCH_ATTR_VXLAN_DEFAULT_ROUTER_MAC attribute setting
'''
print("\vdefaultRouterMacTest()")
vxlan_mac = "01:23:45:67:89:90"
try:
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src='00:22:22:22:22:22',
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=64)
inner_pkt = simple_tcp_packet(eth_dst=self.inner_dmac,
eth_src=ROUTER_MAC,
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=63)
vxlan_pkt = Mask(simple_vxlan_packet(eth_src=ROUTER_MAC,
eth_dst=self.unbor_mac,
ip_id=0,
ip_src=self.lpb_ip,
ip_dst=self.tunnel_ip,
ip_ttl=64,
ip_flags=0x2,
with_udp_chksum=False,
vxlan_vni=self.vni,
inner_frame=inner_pkt))
vxlan_pkt.set_do_not_care_scapy(UDP, 'sport')
print("Sending packet with initial VxLan router MAC")
send_packet(self, self.dev_port24, pkt)
verify_packet(self, vxlan_pkt, self.dev_port25)
print("\tOK")
attr = sai_thrift_get_switch_attribute(
self.client, vxlan_default_router_mac=True)
init_mac = attr['vxlan_default_router_mac']
print("Setting default VxLan router MAC to %s" % vxlan_mac)
sai_thrift_set_switch_attribute(
self.client, vxlan_default_router_mac=vxlan_mac)
pkt = simple_tcp_packet(eth_dst=ROUTER_MAC,
eth_src='00:22:22:22:22:22',
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=64)
inner_pkt = simple_tcp_packet(eth_dst=vxlan_mac,
eth_src=ROUTER_MAC,
ip_dst=self.vm_ip,
ip_src=self.customer_ip,
ip_id=108,
ip_ttl=63)
vxlan_pkt = Mask(simple_vxlan_packet(eth_src=ROUTER_MAC,
eth_dst=self.unbor_mac,
ip_id=0,
udp_dport=4000,
ip_src=self.lpb_ip,
ip_dst=self.tunnel_ip,
ip_ttl=64,
ip_flags=0x2,
with_udp_chksum=False,
vxlan_vni=self.vni,
inner_frame=inner_pkt))
vxlan_pkt.set_do_not_care_scapy(UDP, 'sport')
vxlan_pkt.set_do_not_care_scapy(UDP, 'dport')
print("Sending packet with new VxLan router MAC")
send_packet(self, self.dev_port24, pkt)
verify_packet(self, vxlan_pkt, self.dev_port25)
print("\tOK")
finally:
sai_thrift_set_switch_attribute(
self.client, vxlan_default_router_mac=init_mac)
@group("draft")
class SwitchDefaultVlanTest(SaiHelper):
"""
The class runs VLAN test cases for default vlan returned by SAI
"""
def setUp(self):
super(SwitchDefaultVlanTest, self).setUp()
self.pkt = 0
self.tagged_pkt = 0
self.arp_resp = 0
self.tagged_arp_resp = 0
self.i_pkt_count = 0
self.e_pkt_count = 0
self.mac0 = '00:11:11:11:11:11:11'
self.mac1 = '00:22:22:22:22:22:22'
self.mac2 = '00:33:33:33:33:33:33'
mac_action = SAI_PACKET_ACTION_FORWARD
# delete any pre-existing vlan members on vlan 1
vlan_member_list = sai_thrift_object_list_t(count=100)
mbr_list = sai_thrift_get_vlan_attribute(
self.client, self.default_vlan_id, member_list=vlan_member_list)
vlan_members = mbr_list['SAI_VLAN_ATTR_MEMBER_LIST'].idlist
for mbr in vlan_members:
sai_thrift_remove_vlan_member(self.client, mbr)
self.port24_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port24,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.port25_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port25,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.port26_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port26,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
# for negative learning test
self.port27_bp = sai_thrift_create_bridge_port(
self.client,
bridge_id=self.default_1q_bridge,
port_id=self.port27,
type=SAI_BRIDGE_PORT_TYPE_PORT,
admin_state=True)
self.default_vlan_member1 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.default_vlan_id,
bridge_port_id=self.port24_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED)
sai_thrift_set_port_attribute(self.client, self.port24, port_vlan_id=1)
self.default_vlan_member2 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.default_vlan_id,
bridge_port_id=self.port25_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_TAGGED)
self.default_vlan_member3 = sai_thrift_create_vlan_member(
self.client,
vlan_id=self.default_vlan_id,
bridge_port_id=self.port26_bp,
vlan_tagging_mode=SAI_VLAN_TAGGING_MODE_UNTAGGED)
sai_thrift_set_port_attribute(self.client, self.port26, port_vlan_id=1)
self.fdb_entry0 = sai_thrift_fdb_entry_t(
switch_id=self.switch_id,
mac_address=self.mac0,
bv_id=self.default_vlan_id)
sai_thrift_create_fdb_entry(
self.client,
self.fdb_entry0,
type=SAI_FDB_ENTRY_TYPE_STATIC,
bridge_port_id=self.port24_bp,
packet_action=mac_action)
self.fdb_entry1 = sai_thrift_fdb_entry_t(
switch_id=self.switch_id,
mac_address=self.mac1,
bv_id=self.default_vlan_id)
sai_thrift_create_fdb_entry(
self.client,
self.fdb_entry1,
type=SAI_FDB_ENTRY_TYPE_STATIC,
bridge_port_id=self.port25_bp,
packet_action=mac_action)
self.fdb_entry2 = sai_thrift_fdb_entry_t(
switch_id=self.switch_id,
mac_address=self.mac2,
bv_id=self.default_vlan_id)
sai_thrift_create_fdb_entry(
self.client,
self.fdb_entry2,
type=SAI_FDB_ENTRY_TYPE_STATIC,
bridge_port_id=self.port26_bp,
packet_action=mac_action)
def runTest(self):
try:
self.forwardingTest()
self.vlanLearnTest()
finally:
pass
def tearDown(self):
sai_thrift_set_port_attribute(self.client, self.port24, port_vlan_id=0)
sai_thrift_set_port_attribute(self.client, self.port26, port_vlan_id=0)
sai_thrift_remove_fdb_entry(self.client, self.fdb_entry2)
sai_thrift_remove_fdb_entry(self.client, self.fdb_entry1)
sai_thrift_remove_fdb_entry(self.client, self.fdb_entry0)
sai_thrift_remove_vlan_member(self.client, self.default_vlan_member3)
sai_thrift_remove_vlan_member(self.client, self.default_vlan_member2)
sai_thrift_remove_vlan_member(self.client, self.default_vlan_member1)
sai_thrift_remove_bridge_port(self.client, self.port27_bp)
sai_thrift_remove_bridge_port(self.client, self.port26_bp)
sai_thrift_remove_bridge_port(self.client, self.port25_bp)
sai_thrift_remove_bridge_port(self.client, self.port24_bp)
super(SwitchDefaultVlanTest, self).tearDown()
def forwardingTest(self):
"""
Forwarding between ports with different tagging mode
"""
print("\nforwardingTest()")
try:
print("\tAccessToAccessTest")
print("Sending L2 packet port 24 -> port 26 [access vlan=1])")
pkt = simple_tcp_packet(eth_dst='00:33:33:33:33:33',
eth_src='00:11:11:11:11:11',
ip_dst='172.16.0.1',
ip_id=101,
ip_ttl=64)
send_packet(self, self.dev_port24, pkt)
verify_packet(self, pkt, self.dev_port26)
print("\tAccessToTrunkTest")
print("Sending L2 packet port 24 -> port 25 [trunk vlan=1])")
pkt = simple_tcp_packet(eth_dst='00:22:22:22:22:22',
eth_src='00:11:11:11:11:11',
ip_dst='172.16.0.1',
ip_id=102,
ip_ttl=64)
exp_pkt = simple_tcp_packet(eth_dst='00:22:22:22:22:22',
eth_src='00:11:11:11:11:11',
ip_dst='172.16.0.1',
dl_vlan_enable=True,
vlan_vid=1,
ip_id=102,
ip_ttl=64,
pktlen=104)
send_packet(self, self.dev_port24, pkt)
verify_packet(self, exp_pkt, self.dev_port25)
finally:
pass
def vlanLearnTest(self):
"""
Verifies learning on default vlan
"""
print("\nvlanLearnTest()")
try:
pkt = simple_arp_packet(
eth_src='00:22:22:33:44:55',
arp_op=1, # ARP request
hw_snd='00:22:22:33:44:55',
pktlen=100)
tagged_pkt = simple_arp_packet(
eth_src='00:22:22:33:44:55',
arp_op=1, # ARP request
hw_snd='00:22:22:33:44:55',
vlan_vid=1,
pktlen=104)
uc_pkt = simple_tcp_packet(
eth_dst='00:22:22:33:44:55',
eth_src='00:33:33:33:33:33')
print("Sending ARP packet port 27 -> drop")
send_packet(self, self.dev_port27, pkt)
verify_no_other_packets(self, timeout=2)
print("Sending ARP packet port 24 -> flood to port 25, 26")
send_packet(self, self.dev_port24, pkt)
verify_each_packet_on_each_port(
self, [tagged_pkt, pkt], [self.dev_port25, self.dev_port26])
time.sleep(4)
print("Sending uc packet port 26 -> to learnt port 24")
send_packet(self, self.dev_port26, uc_pkt)
verify_packets(self, uc_pkt, [self.dev_port24])
finally:
time.sleep(2)
sai_thrift_flush_fdb_entries(
self.client,
bv_id=self.default_vlan_id,
entry_type=SAI_FDB_ENTRY_TYPE_DYNAMIC)