in horovod/spark/driver/driver_service.py [0:0]
def _handle(self, req, client_address):
if isinstance(req, TaskHostHashIndicesRequest):
return TaskHostHashIndicesResponse(self._task_host_hash_indices[req.host_hash])
if isinstance(req, SetLocalRankToRankRequest):
self._lock.acquire()
try:
# get index for host and local_rank
indices = self._task_host_hash_indices[req.host]
index = indices[req.local_rank]
# remove earlier rank for this index
# dict.keys() and dict.values() have corresponding order
# so we look up index in _ranks_to_indices.values() and use that position
# to get the corresponding key (the rank) from _ranks_to_indices.keys()
# https://stackoverflow.com/questions/835092/python-dictionary-are-keys-and-values-always-the-same-order
values = list(self._ranks_to_indices.values())
prev_pos = values.index(index) if index in values else None
if prev_pos is not None:
prev_rank = list(self._ranks_to_indices.keys())[prev_pos]
del self._ranks_to_indices[prev_rank]
# memorize rank's index
self._ranks_to_indices[req.rank] = index
finally:
self._lock.release()
return SetLocalRankToRankResponse(index)
if isinstance(req, TaskIndexByRankRequest):
self._lock.acquire()
try:
return TaskIndexByRankResponse(self._ranks_to_indices[req.rank])
finally:
self._lock.release()
if isinstance(req, CodeRequest):
return CodeResponse(self._fn, self._args, self._kwargs)
if isinstance(req, WaitForTaskShutdownRequest):
self._task_shutdown.wait()
return network.AckResponse()
return super(SparkDriverService, self)._handle(req, client_address)