in provider/service.py [0:0]
def __handleDocChange(self, change):
retry = True
retryCount = 0
maxRetries = 5
while retry:
try:
if "deleted" in change and change["deleted"] == True:
logging.info('[changes] Found a delete')
consumer = self.consumers.getConsumerForTrigger(change['id'])
if consumer != None:
if consumer.desiredState() == Consumer.State.Disabled:
# just remove it from memory
logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
self.consumers.removeConsumerForTrigger(consumer.trigger)
else:
logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
consumer.shutdown()
# since we can't use a filter function for the feed (then
# you don't get deletes) we need to manually verify this
# is a valid trigger doc that has changed
elif 'triggerURL' in change['doc']:
logging.info('[changes] Found a change in a trigger document')
document = change['doc']
triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)
if not self.consumers.hasConsumerForTrigger(change["id"]):
if triggerIsAssignedToMe:
logging.info('[{}] Found a new trigger to create'.format(change["id"]))
self.createAndRunConsumer(document)
else:
logging.info("[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"]))
else:
existingConsumer = self.consumers.getConsumerForTrigger(change["id"])
if existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
# running trigger should become disabled
# this should be done regardless of which worker the document claims to be assigned to
logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
existingConsumer.disable()
elif triggerIsAssignedToMe:
logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))
if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document):
# if a delete occurs followed quickly by a create the consumer might get stuck in a dead state,
# so we need to forcefully delete the process before recreating it.
logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"]))
if existingConsumer.process.is_alive():
logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
existingConsumer.process.join(1)
else:
logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))
self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
self.createAndRunConsumer(document)
elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
# disabled trigger has become active
logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
self.createAndRunConsumer(document)
else:
# trigger has become reassigned to a different worker
logging.info("[{}] Shutting down trigger as it has been re-assigned to {}".format(change["id"], document["worker"]))
existingConsumer.shutdown()
elif 'canary-timestamp' in change['doc']:
# found a canary - update lastCanaryTime
logging.info('[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
self.lastCanaryTime = datetime.now()
else:
logging.debug('[changes] Found a change for a non-trigger document')
retry = False
except Exception as e:
logging.error('[{}] Exception caught while handling change.'.format(change["id"]))
logging.error(e)
if retry:
retryCount += 1
if retryCount >= maxRetries:
logging.warn('[{}] Maximum number of retries exceeded for failed change.'.format(change["id"]))
retry = False