in util/vpc-tools/abbey.py [0:0]
def poll_sqs_ansible():
"""
Prints events to the console and
blocks until a final STATS ansible
event is read off of SQS.
SQS does not guarantee FIFO, for that
reason there is a buffer that will delay
messages before they are printed to the
console.
Returns length of the ansible run.
"""
oldest_msg_ts = 0
buf = []
task_report = [] # list of tasks for reporting
last_task = None
completed = 0
while True:
messages = []
while True:
# get all available messages on the queue
msgs = sqs_queue.get_messages(attributes='All')
if not msgs:
break
messages.extend(msgs)
for message in messages:
recv_ts = float(
message.attributes['ApproximateFirstReceiveTimestamp']) * .001
sent_ts = float(message.attributes['SentTimestamp']) * .001
try:
msg_info = {
'msg': json.loads(message.get_body()),
'sent_ts': sent_ts,
'recv_ts': recv_ts,
}
buf.append(msg_info)
except ValueError as e:
print "!!! ERROR !!! unable to parse queue message, " \
"expecting valid json: {} : {}".format(
message.get_body(), e)
if not oldest_msg_ts or recv_ts < oldest_msg_ts:
oldest_msg_ts = recv_ts
sqs_queue.delete_message(message)
now = int(time.time())
if buf:
try:
if (now - min([msg['recv_ts'] for msg in buf])) > args.msg_delay:
# sort by TS instead of recv_ts
# because the sqs timestamp is not as
# accurate
buf.sort(key=lambda k: k['msg']['TS'])
to_disp = buf.pop(0)
if 'START' in to_disp['msg']:
print '\n{:0>2.0f}:{:0>5.2f} {} : Starting "{}"'.format(
to_disp['msg']['TS'] / 60,
to_disp['msg']['TS'] % 60,
to_disp['msg']['PREFIX'],
to_disp['msg']['START']),
elif 'TASK' in to_disp['msg']:
print "\n{:0>2.0f}:{:0>5.2f} {} : {}".format(
to_disp['msg']['TS'] / 60,
to_disp['msg']['TS'] % 60,
to_disp['msg']['PREFIX'],
to_disp['msg']['TASK']),
last_task = to_disp['msg']['TASK']
elif 'OK' in to_disp['msg']:
if args.verbose:
print "\n"
for key, value in to_disp['msg']['OK'].iteritems():
print " {:<15}{}".format(key, value)
else:
invocation = to_disp['msg']['OK']['invocation']
module = invocation['module_name']
# 'set_fact' does not provide a changed value.
if module == 'set_fact':
changed = "OK"
elif to_disp['msg']['OK']['changed']:
changed = "*OK*"
else:
changed = "OK"
print " {}".format(changed),
task_report.append({
'TASK': last_task,
'INVOCATION': to_disp['msg']['OK']['invocation'],
'DELTA': to_disp['msg']['delta'],
})
elif 'FAILURE' in to_disp['msg']:
print " !!!! FAILURE !!!!",
for key, value in to_disp['msg']['FAILURE'].iteritems():
print " {:<15}{}".format(key, value)
raise Exception("Failed Ansible run")
elif 'STATS' in to_disp['msg']:
print "\n{:0>2.0f}:{:0>5.2f} {} : COMPLETE".format(
to_disp['msg']['TS'] / 60,
to_disp['msg']['TS'] % 60,
to_disp['msg']['PREFIX'])
# Since 3 ansible plays get run.
# We see the COMPLETE message 3 times
# wait till the last one to end listening
# for new messages.
completed += 1
if completed >= NUM_PLAYBOOKS:
return (to_disp['msg']['TS'], task_report)
except KeyError:
print "Failed to print status from message: {}".format(to_disp)
if not messages:
# wait 1 second between sqs polls
time.sleep(1)