benchmarks/api/run_dist_bench.py (68 lines of code) (raw):
# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import yaml
import argparse
import paramiko
import click
if __name__ == "__main__":
parser = argparse.ArgumentParser('Run DistRandomSampler benchmarks.')
parser.add_argument('--config', type=str, default='bench_dist_config.yml',
help='paths to configuration file for benchmarks')
parser.add_argument('--epochs', type=int, default=1,
help='repeat epochs for sampling')
parser.add_argument('--batch_size', type=int, default=2048,
help='batch size for sampling')
parser.add_argument('--shuffle', action="store_true",
help='whether to shuffle input seeds at each epoch')
parser.add_argument('--with_edge', action="store_true",
help='whether to sample with edge ids')
parser.add_argument('--collect_features', action='store_true',
help='whether to collect features for sampled results')
parser.add_argument('--worker_concurrency', type=int, default=4,
help='concurrency for each sampling worker')
parser.add_argument('--channel_size', type=str, default='4GB',
help='memory used for shared-memory channel')
parser.add_argument('--master_addr', type=str, default='0.0.0.0',
help='master ip address for synchronization across all training nodes')
parser.add_argument('--master_port', type=str, default='12345',
help='port for synchronization across all training nodes')
args = parser.parse_args()
config = open(args.config, 'r')
config = yaml.safe_load(config)
dataset = config['dataset']
ip_list, port_list, username_list = config['nodes'], config['ports'], config['usernames']
dst_path_list = config['dst_paths']
node_ranks = config['node_ranks']
num_nodes = len(node_ranks)
visible_devices = config['visible_devices']
python_bins = config['python_bins']
num_cores = len(visible_devices[0].split(','))
dataset_path = "../../data/"
passwd_dict = {}
for username, ip in zip(username_list, ip_list):
passwd_dict[ip+username] = click.prompt('passwd for '+username+'@'+ip,
hide_input=True)
for username, ip, port, dst, noderk, device, pythonbin in zip(
username_list,
ip_list,
port_list,
dst_path_list,
node_ranks,
visible_devices,
python_bins,
):
trans = paramiko.Transport((ip, port))
trans.connect(username=username, password=passwd_dict[ip+username])
ssh = paramiko.SSHClient()
ssh._transport = trans
to_bench_dir = 'cd '+dst+'/benchmarks/api/ '
exec_bench = "tmux new -d 'CUDA_VISIBLE_DEVICES="+device+" "+pythonbin+" bench_dist_neighbor_loader.py --dataset="+dataset+" --node_rank="+str(noderk)+" --num_nodes="+str(num_nodes)+" --sample_nprocs="+str(num_cores)+" --master_addr="+args.master_addr+" --master_port="+args.master_port+ " --batch_size="+str(args.batch_size)+" --channel_size="+args.channel_size+" --epochs="+str(args.epochs)
if args.collect_features:
exec_bench += " --collect_features"
if args.with_edge:
exec_bench += " --with_edge"
if args.shuffle:
exec_bench += " --shuffle"
print(to_bench_dir + ' && '+ exec_bench + " '")
stdin, stdout, stderr = ssh.exec_command(to_bench_dir+' && '+exec_bench+" '", bufsize=1)
print(stdout.read().decode())
print(stderr.read().decode())
ssh.close()