def poll_sqs_ansible()

in util/vpc-tools/abbey.py [0:0]


def poll_sqs_ansible():
    """
    Prints events to the console and
    blocks until a final STATS ansible
    event is read off of SQS.

    SQS does not guarantee FIFO, for that
    reason there is a buffer that will delay
    messages before they are printed to the
    console.

    Returns length of the ansible run.
    """
    oldest_msg_ts = 0
    buf = []
    task_report = []  # list of tasks for reporting
    last_task = None
    completed = 0
    while True:
        messages = []
        while True:
            # get all available messages on the queue
            msgs = sqs_queue.get_messages(attributes='All')
            if not msgs:
                break
            messages.extend(msgs)

        for message in messages:
            recv_ts = float(
                message.attributes['ApproximateFirstReceiveTimestamp']) * .001
            sent_ts = float(message.attributes['SentTimestamp']) * .001
            try:
                msg_info = {
                    'msg': json.loads(message.get_body()),
                    'sent_ts': sent_ts,
                    'recv_ts': recv_ts,
                }
                buf.append(msg_info)
            except ValueError as e:
                print "!!! ERROR !!! unable to parse queue message, " \
                      "expecting valid json: {} : {}".format(
                          message.get_body(), e)
            if not oldest_msg_ts or recv_ts < oldest_msg_ts:
                oldest_msg_ts = recv_ts
            sqs_queue.delete_message(message)

        now = int(time.time())
        if buf:
            try:
                if (now - min([msg['recv_ts'] for msg in buf])) > args.msg_delay:
                    # sort by TS instead of recv_ts
                    # because the sqs timestamp is not as
                    # accurate
                    buf.sort(key=lambda k: k['msg']['TS'])
                    to_disp = buf.pop(0)
                    if 'START' in to_disp['msg']:
                        print '\n{:0>2.0f}:{:0>5.2f} {} : Starting "{}"'.format(
                            to_disp['msg']['TS'] / 60,
                            to_disp['msg']['TS'] % 60,
                            to_disp['msg']['PREFIX'],
                            to_disp['msg']['START']),

                    elif 'TASK' in to_disp['msg']:
                        print "\n{:0>2.0f}:{:0>5.2f} {} : {}".format(
                            to_disp['msg']['TS'] / 60,
                            to_disp['msg']['TS'] % 60,
                            to_disp['msg']['PREFIX'],
                            to_disp['msg']['TASK']),
                        last_task = to_disp['msg']['TASK']
                    elif 'OK' in to_disp['msg']:
                        if args.verbose:
                            print "\n"
                            for key, value in to_disp['msg']['OK'].iteritems():
                                print "    {:<15}{}".format(key, value)
                        else:
                            invocation = to_disp['msg']['OK']['invocation']
                            module = invocation['module_name']
                            # 'set_fact' does not provide a changed value.
                            if module == 'set_fact':
                                changed = "OK"
                            elif to_disp['msg']['OK']['changed']:
                                changed = "*OK*"
                            else:
                                changed = "OK"
                            print " {}".format(changed),
                        task_report.append({
                            'TASK': last_task,
                            'INVOCATION': to_disp['msg']['OK']['invocation'],
                            'DELTA': to_disp['msg']['delta'],
                        })
                    elif 'FAILURE' in to_disp['msg']:
                        print " !!!! FAILURE !!!!",
                        for key, value in to_disp['msg']['FAILURE'].iteritems():
                            print "    {:<15}{}".format(key, value)
                        raise Exception("Failed Ansible run")
                    elif 'STATS' in to_disp['msg']:
                        print "\n{:0>2.0f}:{:0>5.2f} {} : COMPLETE".format(
                            to_disp['msg']['TS'] / 60,
                            to_disp['msg']['TS'] % 60,
                            to_disp['msg']['PREFIX'])

                        # Since 3 ansible plays get run.
                        # We see the COMPLETE message 3 times
                        # wait till the last one to end listening
                        # for new messages.
                        completed += 1
                        if completed >= NUM_PLAYBOOKS:
                            return (to_disp['msg']['TS'], task_report)
            except KeyError:
                print "Failed to print status from message: {}".format(to_disp)

        if not messages:
            # wait 1 second between sqs polls
            time.sleep(1)