in horovod/runner/common/service/driver_service.py [0:0]
def _handle(self, req, client_address):
if isinstance(req, RegisterTaskRequest):
self._wait_cond.acquire()
try:
assert 0 <= req.index < self._num_proc
self._all_task_addresses[req.index] = req.task_addresses
# Just use source address for service for fast probing.
self._task_addresses_for_driver[req.index] = \
self._filter_by_ip(req.task_addresses, client_address[0])
if not self._task_addresses_for_driver[req.index]:
# No match is possible if one of the servers is behind NAT.
# We don't throw exception here, but will allow the following
# code fail with NoValidAddressesFound.
print('ERROR: Task {index} declared addresses {task_addresses}, '
'but has connected from a different address {source}. '
'This is not supported. Is the server behind NAT?'
''.format(index=req.index, task_addresses=req.task_addresses,
source=client_address[0]))
# Remove host hash earlier registered under this index.
if req.index in self._task_index_host_hash:
earlier_host_hash = self._task_index_host_hash[req.index]
if earlier_host_hash != req.host_hash:
self._task_host_hash_indices[earlier_host_hash].remove(req.index)
# Make index -> host hash map.
self._task_index_host_hash[req.index] = req.host_hash
# Make host hash -> indices map.
if req.host_hash not in self._task_host_hash_indices:
self._task_host_hash_indices[req.host_hash] = []
self._task_host_hash_indices[req.host_hash].append(req.index)
# TODO: this sorting is a problem in elastic horovod
self._task_host_hash_indices[req.host_hash].sort()
finally:
self._wait_cond.notify_all()
self._wait_cond.release()
return network.AckResponse()
if isinstance(req, RegisterTaskToTaskAddressesRequest):
self.register_task_to_task_addresses(req.index, req.task_addresses)
return network.AckResponse()
if isinstance(req, AllTaskAddressesRequest):
return AllTaskAddressesResponse(self._all_task_addresses[req.index])
return super(BasicDriverService, self)._handle(req, client_address)