in horovod/runner/common/service/task_service.py [0:0]
def _handle(self, req, client_address):
if isinstance(req, RunCommandRequest):
self._wait_cond.acquire()
try:
if self._command_thread is None:
# we add req.env to _command_env and make this available to the executed command
if self._command_env:
env = self._command_env.copy()
self._add_envs(env, req.env)
req.env = env
if self._verbose >= 2:
print("Task service executes command: {}".format(req.command))
if self._verbose >= 3:
for key, value in req.env.items():
if 'SECRET' in key:
value = '*' * len(value)
print("Task service env: {} = {}".format(key, value))
# We only permit executing exactly one command, so this is idempotent.
self._command_abort = threading.Event()
self._command_thread = in_thread(
target=self._run_command,
args=(req.command, req.env, self._command_abort)
)
finally:
self._wait_cond.notify_all()
self._wait_cond.release()
return network.AckResponse()
if isinstance(req, AbortCommandRequest):
self._wait_cond.acquire()
try:
if self._command_thread is not None:
self._command_abort.set()
finally:
self._wait_cond.release()
return network.AckResponse()
if isinstance(req, NotifyInitialRegistrationCompleteRequest):
self._wait_cond.acquire()
try:
self._initial_registration_complete = True
finally:
self._wait_cond.notify_all()
self._wait_cond.release()
return network.AckResponse()
if isinstance(req, CommandExitCodeRequest):
self._wait_cond.acquire()
try:
terminated = (self._command_thread is not None and
not self._command_thread.is_alive())
return CommandExitCodeResponse(terminated,
self._command_exit_code if terminated else None)
finally:
self._wait_cond.release()
if isinstance(req, WaitForCommandExitCodeRequest):
self._wait_cond.acquire()
try:
while self._command_thread is None or self._command_thread.is_alive():
self._wait_cond.wait(max(req.delay, WAIT_FOR_COMMAND_MIN_DELAY))
return WaitForCommandExitCodeResponse(self._command_exit_code)
finally:
self._wait_cond.release()
if isinstance(req, RegisterCodeResultRequest):
self._fn_result = req.result
return network.AckResponse()
return super(BasicTaskService, self)._handle(req, client_address)