def __pollForMessages()

in provider/consumer.py [0:0]


    def __pollForMessages(self):
        messages = []
        totalPayloadSize = 0
        batchMessages = True

        if self.__shouldRun():
            while batchMessages and (self.secondsSinceLastPoll() < 2):
                if self.queuedMessage != None:
                    logging.debug('[{}] Handling message left over from last batch.'.format(self.trigger))
                    message = self.queuedMessage
                    self.queuedMessage = None
                else:
                    message = self.consumer.poll(1.0)

                if self.secondsSinceLastPoll() < 0:
                    logging.info('[{}] Completed first poll'.format(self.trigger))

                if (message is not None):
                    if not message.error():
                        logging.debug("Consumed message: {}".format(str(message)))
                        messageSize = self.__sizeMessage(message)
                        if totalPayloadSize + messageSize > payload_limit:
                            if len(messages) == 0:
                                logging.error('[{}] Single message at offset {} exceeds payload size limit. Skipping this message!'.format(self.trigger, message.offset()))
                                self.consumer.commit(message=message, async=False)
                            else:
                                logging.debug('[{}] Message at offset {} would cause payload to exceed the size limit. Queueing up for the next round...'.format(self.trigger, message.offset()))
                                self.queuedMessage = message

                            # in any case, we need to stop batching now
                            batchMessages = False
                        else:
                            totalPayloadSize += messageSize
                            messages.append(message)
                    elif message.error().code() != KafkaError._PARTITION_EOF:
                        logging.error('[{}] Error polling: {}'.format(self.trigger, message.error()))
                        batchMessages = False
                    else:
                        logging.debug('[{}] No more messages. Stopping batch op.'.format(self.trigger))
                        batchMessages = False
                else:
                    logging.debug('[{}] message was None. Stopping batch op.'.format(self.trigger))
                    batchMessages = False

        logging.debug('[{}] Completed poll'.format(self.trigger))

        if len(messages) > 0:
            logging.info("[{}] Found {} messages with a total size of {} bytes".format(self.trigger, len(messages), totalPayloadSize))

        self.updateLastPoll()
        return messages