in provider/consumer.py [0:0]
def __fireTrigger(self, messages):
if self.__shouldRun():
lastMessage = messages[len(messages) - 1]
# I'm sure there is a much more clever way to do this ;)
mappedMessages = []
for message in messages:
mappedMessages.append(self.__getMessagePayload(message))
payload = {}
payload['messages'] = mappedMessages
retry = True
retry_count = 0
logging.info("[{}] Firing trigger with {} messages".format(self.trigger,len(mappedMessages)))
while retry:
try:
response = requests.post(self.triggerURL, json=payload, auth=self.authHandler, timeout=10.0, verify=check_ssl)
status_code = response.status_code
logging.info("[{}] Response status code {}".format(self.trigger, status_code))
# Manually commit offset if the trigger was fired successfully. Retry firing the trigger
# for a select set of status codes
if status_code in range(200, 300):
if status_code == 204:
logging.info("[{}] Successfully fired trigger".format(self.trigger))
else:
response_json = response.json()
if 'activationId' in response_json and response_json['activationId'] is not None:
logging.info("[{}] Fired trigger with activation {}".format(self.trigger, response_json['activationId']))
else:
logging.info("[{}] Successfully fired trigger".format(self.trigger))
# the consumer may have consumed messages that did not make it into the messages array.
# the consumer may have consumed messages that did not make it into the messages array.
# be sure to only commit to the messages that were actually fired.
self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
retry = False
elif self.__shouldDisable(status_code):
retry = False
logging.error('[{}] Error talking to OpenWhisk, status code {}'.format(self.trigger, status_code))
self.__dumpRequestResponse(response)
self.__disableTrigger(status_code)
except requests.exceptions.RequestException as e:
logging.error('[{}] Error talking to OpenWhisk: {}'.format(self.trigger, e))
except AuthHandlerException as e:
logging.error("[{}] Encountered an exception from auth handler, status code {}").format(self.trigger, e.response.status_code)
self.__dumpRequestResponse(e.response)
if self.__shouldDisable(e.response.status_code):
retry = False
self.__disableTrigger(e.response.status_code)
if retry:
retry_count += 1
if retry_count <= self.max_retries:
sleepyTime = pow(2,retry_count)
logging.info("[{}] Retrying in {} second(s)".format(self.trigger, sleepyTime))
time.sleep(sleepyTime)
else:
logging.warn("[{}] Skipping {} messages to offset {} of partition {}".format(self.trigger, len(messages), lastMessage.offset(), lastMessage.partition()))
self.consumer.commit(offsets=self.__getOffsetList(messages), async=False)
retry = False