in horovod/runner/driver/driver_service.py [0:0]
def _driver_fn(all_host_names, local_host_names, settings):
"""
launches the service service, launches the task service on each worker and
have them register with the service service. Each worker probes all the
interfaces of the worker index + 1 (in a ring manner) and only keeps the
routed interfaces. Function returns the intersection of the set of all the
routed interfaces on all the workers.
:param all_host_names: list of addresses. for example,
['worker-0','worker-1']
['10.11.11.11', '10.11.11.12']
:type all_host_names: list(string)
:param local_host_names: host names that resolve into a local addresses.
:type local_host_names: set
:param settings: the object that contains the setting for running horovod
:type settings: horovod.runner.common.util.settings.Settings
:return: example: ['eth0', 'eth1']
:rtype: list[string]
"""
# Launch a TCP server called service service on the host running horovod
num_hosts = len(all_host_names)
driver = HorovodRunDriverService(num_hosts, settings.key, settings.nics)
if settings.verbose >= 2:
print('Launched horovod server.')
# Have all the workers register themselves with the service service.
_launch_task_servers(all_host_names, local_host_names,
driver.addresses(), settings)
if settings.verbose >= 2:
print('Attempted to launch horovod task servers.')
try:
# wait for all the hosts to register with the service service.
if settings.verbose >= 2:
print('Waiting for the hosts to acknowledge.')
driver.wait_for_initial_registration(settings.start_timeout)
tasks = [
task_service.HorovodRunTaskClient(
index,
driver.task_addresses_for_driver(index),
settings.key,
settings.verbose) for index in range(
num_hosts)]
# Notify all the drivers that the initial registration is complete.
for task in tasks:
task.notify_initial_registration_complete()
if settings.verbose >= 2:
print('Notified all the hosts that the registration is complete.')
# Each worker should probe the interfaces of the next worker in a ring
# manner and filter only the routed ones -- it should filter out
# interfaces that are not really connected to any external networks
# such as lo0 with address 127.0.0.1.
if settings.verbose >= 2:
print('Waiting for hosts to perform host-to-host interface checking.')
driver.wait_for_task_to_task_address_updates(settings.start_timeout)
if settings.verbose >= 2:
print('Host-to-host interface checking successful.')
# Determine a set of common interfaces for task-to-task communication.
nics = set(driver.task_addresses_for_tasks(0).keys())
for index in range(1, num_hosts):
nics.intersection_update(
driver.task_addresses_for_tasks(index).keys())
if not nics:
raise Exception(
'Unable to find a set of common task-to-task communication interfaces: %s'
% [(index, driver.task_addresses_for_tasks(index))
for index in range(num_hosts)])
return nics
finally:
driver.shutdown()