in provider/consumer.py [0:0]
def __pollForMessages(self):
messages = []
totalPayloadSize = 0
batchMessages = True
if self.__shouldRun():
while batchMessages and (self.secondsSinceLastPoll() < 2):
if self.queuedMessage != None:
logging.debug('[{}] Handling message left over from last batch.'.format(self.trigger))
message = self.queuedMessage
self.queuedMessage = None
else:
message = self.consumer.poll(1.0)
if self.secondsSinceLastPoll() < 0:
logging.info('[{}] Completed first poll'.format(self.trigger))
if (message is not None):
if not message.error():
logging.debug("Consumed message: {}".format(str(message)))
messageSize = self.__sizeMessage(message)
if totalPayloadSize + messageSize > payload_limit:
if len(messages) == 0:
logging.error('[{}] Single message at offset {} exceeds payload size limit. Skipping this message!'.format(self.trigger, message.offset()))
self.consumer.commit(message=message, async=False)
else:
logging.debug('[{}] Message at offset {} would cause payload to exceed the size limit. Queueing up for the next round...'.format(self.trigger, message.offset()))
self.queuedMessage = message
# in any case, we need to stop batching now
batchMessages = False
else:
totalPayloadSize += messageSize
messages.append(message)
elif message.error().code() != KafkaError._PARTITION_EOF:
logging.error('[{}] Error polling: {}'.format(self.trigger, message.error()))
batchMessages = False
else:
logging.debug('[{}] No more messages. Stopping batch op.'.format(self.trigger))
batchMessages = False
else:
logging.debug('[{}] message was None. Stopping batch op.'.format(self.trigger))
batchMessages = False
logging.debug('[{}] Completed poll'.format(self.trigger))
if len(messages) > 0:
logging.info("[{}] Found {} messages with a total size of {} bytes".format(self.trigger, len(messages), totalPayloadSize))
self.updateLastPoll()
return messages