perfkitbenchmarker/windows_packages/ntttcp.py (260 lines of code) (raw):

# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module containing NTttcp installation and cleanup functions. NTttcp is a tool made for benchmarking Windows networking. More information about NTttcp may be found here: https://gallery.technet.microsoft.com/NTttcp-Version-528-Now-f8b12769 """ import collections import ntpath import time import xml.etree.ElementTree from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import sample from perfkitbenchmarker import vm_util FLAGS = flags.FLAGS flags.DEFINE_integer( 'ntttcp_threads', 1, 'The number of client and server threads for NTttcp to run with.', ) flags.DEFINE_integer( 'ntttcp_time', 60, 'The number of seconds for NTttcp to run.' ) flags.DEFINE_bool('ntttcp_udp', False, 'Whether to run a UDP test.') flags.DEFINE_integer( 'ntttcp_cooldown_time', 60, 'Time to wait between the test runs.' ) flags.DEFINE_integer( 'ntttcp_packet_size', None, 'The size of the packet being used in the test.' ) flags.DEFINE_integer( 'ntttcp_sender_sb', -1, 'The size of the send buffer, in Kilo Bytes, on the ' 'sending VM. The default is the OS default.', ) flags.DEFINE_integer( 'ntttcp_sender_rb', -1, 'The size of the receive buffer, in Kilo Bytes, on the ' 'sending VM. The default is the OS default.', ) flags.DEFINE_integer( 'ntttcp_receiver_sb', -1, 'The size of the send buffer, in Kilo Bytes, on the ' 'receiving VM. The default is the OS default.', ) flags.DEFINE_integer( 'ntttcp_receiver_rb', -1, 'The size of the receive buffer, in Kilo Bytes, on the ' 'receiving VM. The default is the OS default.', ) flags.DEFINE_list( 'ntttcp_config_list', '', 'comma separated list of configs to run with ntttcp. The ' 'format for a single config is UDP:THREADS:RUNTIME_S:IP_TYPE:PACKET_SIZE, ' 'for example True:4:60:INTERNAL:0,False:8:60:EXTERNAL:150', ) # When adding new configs to ntttcp_config_list, increase this value _NUM_PARAMS_IN_CONFIG = 5 CONTROL_PORT = 6001 BASE_DATA_PORT = 5001 NTTTCP_RETRIES = 10 NTTTCP_VERSION = 'v5.36' NTTTCP_EXE = 'NTttcp.exe' NTTTCP_URL = ( 'https://github.com/microsoft/ntttcp/releases/download/' + NTTTCP_VERSION + '/' + NTTTCP_EXE ) TRUE_VALS = ['True', 'true', 't'] FALSE_VALS = ['False', 'false', 'f'] # named tuple used in passing configs around NtttcpConf = collections.namedtuple( 'NtttcpConf', 'udp threads time_s ip_type packet_size' ) def NtttcpConfigListValidator(value): """Returns whether or not the config list flag is valid.""" if len(value) == 1 and not value[0]: # not using the config list here return True for config in value: config_vals = config.split(':') if len(config_vals) < _NUM_PARAMS_IN_CONFIG: return False try: udp = config_vals[0] threads = int(config_vals[1]) time_s = int(config_vals[2]) ip_type = config_vals[3] packet_size = int(config_vals[4]) except ValueError: return False if udp not in TRUE_VALS + FALSE_VALS: return False if threads < 1: return False if time_s < 1: return False if packet_size < 0: return False # verify the ip type if ip_type not in [ vm_util.IpAddressSubset.EXTERNAL, vm_util.IpAddressSubset.INTERNAL, ]: return False return True flags.register_validator( 'ntttcp_config_list', NtttcpConfigListValidator, 'malformed config list' ) def ParseConfigList(): """Get the list of configs for the test from the flags.""" if not FLAGS.ntttcp_config_list: # config is the empty string. return [ NtttcpConf( udp=FLAGS.ntttcp_udp, threads=FLAGS.ntttcp_threads, time_s=FLAGS.ntttcp_time, ip_type=FLAGS.ip_addresses, packet_size=FLAGS.ntttcp_packet_size, ) ] conf_list = [] for config in FLAGS.ntttcp_config_list: confs = config.split(':') conf_list.append( NtttcpConf( udp=(confs[0] in TRUE_VALS), threads=int(confs[1]), time_s=int(confs[2]), ip_type=confs[3], packet_size=int(confs[4]), ) ) return conf_list def Install(vm): """Installs the NTttcp package on the VM.""" exe_path = ntpath.join(vm.temp_dir, NTTTCP_EXE) vm.DownloadFile(NTTTCP_URL, exe_path) @vm_util.Retry(poll_interval=60, fuzz=1, max_retries=NTTTCP_RETRIES) def _TaskKillNtttcp(vm): kill_command = 'taskkill /IM ntttcp /F' vm.RemoteCommand(kill_command, ignore_failure=True) def _RunNtttcp(vm, options): timeout_duration = 3 * FLAGS.ntttcp_time ntttcp_exe_dir = vm.temp_dir command = 'cd {ntttcp_exe_dir}; .\\{ntttcp_exe} {ntttcp_options}'.format( ntttcp_exe=NTTTCP_EXE, ntttcp_exe_dir=ntttcp_exe_dir, ntttcp_options=options, ) vm.RobustRemoteCommand(command, timeout=timeout_duration) def _RemoveXml(vm): ntttcp_exe_dir = vm.temp_dir rm_command = 'cd {ntttcp_exe_dir}; rm xml.txt'.format( ntttcp_exe_dir=ntttcp_exe_dir ) vm.RemoteCommand(rm_command, ignore_failure=True) @vm_util.Retry(max_retries=NTTTCP_RETRIES) def _CatXml(vm): ntttcp_exe_dir = vm.temp_dir cat_command = 'cd {ntttcp_exe_dir}; cat xml.txt'.format( ntttcp_exe_dir=ntttcp_exe_dir ) ntttcp_xml, _ = vm.RemoteCommand(cat_command, timeout=10) return ntttcp_xml def _GetSockBufferSize(sock_buff_size): return '%dK' % sock_buff_size if sock_buff_size != -1 else sock_buff_size @vm_util.Retry(max_retries=NTTTCP_RETRIES) def RunNtttcp( sending_vm, receiving_vm, receiving_ip_address, ip_type, udp, threads, time_s, packet_size, cooldown, ): """Run NTttcp and return the samples collected from the run.""" if cooldown: time.sleep(FLAGS.ntttcp_cooldown_time) # Clean up any stray ntttcp processes in case this is retry. _TaskKillNtttcp(sending_vm) _TaskKillNtttcp(receiving_vm) packet_size_string = '' if packet_size: packet_size_string = ' -l %d ' % packet_size shared_options = '-xml -t {time} -p {port} {packet_size}'.format( time=time_s, port=BASE_DATA_PORT, packet_size=packet_size_string ) udp_string = '-u' if udp else '' sending_options = shared_options + ( "-s {udp} -m '{threads},*,{ip}' -rb {rb} -sb {sb}" ).format( udp=udp_string, threads=threads, ip=receiving_ip_address, rb=_GetSockBufferSize(FLAGS.ntttcp_sender_rb), sb=_GetSockBufferSize(FLAGS.ntttcp_sender_sb), ) receiving_options = shared_options + ( "-r {udp} -m '{threads},*,0.0.0.0' -rb {rb} -sb {sb}" ).format( udp=udp_string, threads=threads, rb=_GetSockBufferSize(FLAGS.ntttcp_receiver_rb), sb=_GetSockBufferSize(FLAGS.ntttcp_receiver_sb), ) # NTttcp will append to the xml file when it runs, which causes parsing # to fail if there was a preexisting xml file. To be safe, try deleting # the xml file. _RemoveXml(sending_vm) _RemoveXml(receiving_vm) # Start receiver/server first. process_args = [ (_RunNtttcp, (receiving_vm, receiving_options), {}), (_RunNtttcp, (sending_vm, sending_options), {}), ] background_tasks.RunParallelProcesses(process_args, 200, 1) sender_xml = _CatXml(sending_vm) receiver_xml = _CatXml(receiving_vm) metadata = {'ip_type': ip_type} for vm_specifier, vm in ('receiving', receiving_vm), ('sending', sending_vm): for k, v in vm.GetResourceMetadata().items(): metadata['{}_{}'.format(vm_specifier, k)] = v return ParseNtttcpResults(sender_xml, receiver_xml, metadata) def ParseNtttcpResults(sender_xml_results, receiver_xml_results, metadata): """Parses the xml output from NTttcp and returns a list of samples. The list of samples contains total throughput and per thread throughput metrics (if there is more than a single thread). Args: sender_xml_results: ntttcp test output from the sender. receiver_xml_results: ntttcp test output from the receiver. metadata: metadata to be included as part of the samples. Returns: list of samples from the results of the ntttcp tests. """ sender_xml_root = xml.etree.ElementTree.fromstring(sender_xml_results) receiver_xml_root = xml.etree.ElementTree.fromstring(receiver_xml_results) samples = [] metadata = metadata.copy() # Get the parameters from the sender XML output, but skip the throughput and # thread info. Those will be used in the samples, not the metadata. for item in list(sender_xml_root): if item.tag == 'parameters': for param in list(item): metadata[param.tag] = param.text elif item.tag == 'throughput' or item.tag == 'thread': continue else: metadata['sender %s' % item.tag] = item.text # We do not want the parameters from the receiver (we already got those # from the sender), but we do want everything else and have it marked as # coming from the receiver. for item in list(receiver_xml_root): if item.tag == 'parameters' or item.tag == 'thread': continue elif item.tag == 'throughput': if item.attrib['metric'] == 'mbps': metadata['receiver throughput'] = item.text else: metadata['receiver %s' % item.tag] = item.text metadata['sender rb'] = FLAGS.ntttcp_sender_rb metadata['sender sb'] = FLAGS.ntttcp_sender_rb metadata['receiver rb'] = FLAGS.ntttcp_receiver_rb metadata['receiver sb'] = FLAGS.ntttcp_receiver_sb throughput_element = sender_xml_root.find('./throughput[@metric="mbps"]') samples.append( sample.Sample( 'Total Throughput', float(throughput_element.text), 'Mbps', metadata ) ) thread_elements = sender_xml_root.findall('./thread') if len(thread_elements) > 1: for element in thread_elements: throughput_element = element.find('./throughput[@metric="mbps"]') metadata = metadata.copy() metadata['thread_index'] = element.attrib['index'] samples.append( sample.Sample( 'Thread Throughput', float(throughput_element.text), 'Mbps', metadata, ) ) return samples