in perfkitbenchmarker/scripts/wait_for_command.py [0:0]
def main():
p = optparse.OptionParser()
p.add_option(
'-o',
'--stdout',
dest='stdout',
help="""Read stdout from FILE.""",
metavar='FILE',
)
p.add_option(
'-e',
'--stderr',
dest='stderr',
help="""Read stderr from FILE.""",
metavar='FILE',
)
p.add_option(
'-s',
'--status',
dest='status',
metavar='FILE',
help=(
'Get process exit status from FILE. '
'Will block until a shared lock is acquired on FILE.'
),
)
p.add_option(
'-d',
'--delete',
dest='delete',
action='store_true',
help='Delete stdout, stderr, and status files when finished.',
)
p.add_option(
'-x',
'--exclusive',
dest='exclusive',
help=(
'Will block until FILE exists to ensure that status is ready to be '
'read. Required.'
),
metavar='FILE',
)
options, args = p.parse_args()
if args:
sys.stderr.write('Unexpected arguments: {0}\n'.format(args))
return 1
missing = []
for option in ('status', 'exclusive'):
if getattr(options, option) is None:
missing.append(option)
if missing:
p.print_usage()
msg = 'Missing required flag(s): {0}\n'.format(
', '.join('--' + i for i in missing)
)
sys.stderr.write(msg)
return 1
start = time.time()
return_code_str = None
while time.time() < WAIT_TIMEOUT_IN_SEC + start:
try:
with open(options.exclusive, 'r'):
with open(options.status, 'r'):
break
except IOError as e:
print("WARNING: file doesn't exist, retrying: %s" % e, file=sys.stderr)
time.sleep(WAIT_SLEEP_IN_SEC)
# Set a signal handler to raise an InterruptedError on SIGALRM (this is no
# longer done automatically after PEP 475).
signal.signal(signal.SIGALRM, signal_handler)
# Send a SIGALRM signal after WAIT_TIMEOUT_IN_SEC seconds
signal.alarm(int(WAIT_TIMEOUT_IN_SEC))
with open(options.status, 'r') as status:
try:
# If we can acquire the lock on status, the command we're waiting on is
# done; if we can't acquire it for the next WAIT_TIMEOUT_IN_SEC seconds
# this attempt will be interrupted and we'll catch an InterruptedError.
fcntl.lockf(status, fcntl.LOCK_SH)
except InterruptedError:
print('Wait timed out. This will be retried with a subsequent wait.')
return 0
# OSError and IOError have similar interfaces, and later versions of fcntl
# will raise OSError where earlier versions raised IOError--we catch both
# here for compatibility.
except (OSError, IOError) as e:
if e.errno == errno.ECONNREFUSED:
print(
'Connection refused during wait. '
'This will be retried with a subsequent wait.'
)
return 0
elif e.errno in (errno.EAGAIN, errno.EACCES):
print(
'Status currently being modified and cannot be read right now. '
'This will be retried with a subsequent wait.'
)
return 0
raise e
signal.alarm(0)
return_code_str = status.read()
if not (options.stdout and options.stderr):
print('Command finished.')
return 0
# Some commands write out non UTF-8 control characters. Replace them with '?'
# to make Python 3.6+ happy.
with open(options.stdout, 'r', errors='backslashreplace') as stdout:
with open(options.stderr, 'r', errors='backslashreplace') as stderr:
if return_code_str:
return_code = int(return_code_str)
else:
print('WARNING: wrapper script interrupted.', file=sys.stderr)
return_code = 1
# RemoteCommand retries 255 as temporary SSH failure. In this case,
# long running command actually returned 255 and should not be retried.
if return_code == RETRYABLE_SSH_RETCODE:
print('WARNING: command returned 255.', file=sys.stderr)
return_code = 1
stderr_copier = threading.Thread(
target=shutil.copyfileobj,
args=[stderr, sys.stderr],
name='stderr-copier',
)
stderr_copier.daemon = True
stderr_copier.start()
try:
shutil.copyfileobj(stdout, sys.stdout)
finally:
stderr_copier.join()
if options.delete:
for f in [options.stdout, options.stderr, options.status]:
os.unlink(f)
return return_code