def __handleDocChange()

in provider/service.py [0:0]


    def __handleDocChange(self, change):
        retry = True
        retryCount = 0
        maxRetries = 5

        while retry:
            try:
                if "deleted" in change and change["deleted"] == True:
                    logging.info('[changes] Found a delete')
                    consumer = self.consumers.getConsumerForTrigger(change['id'])
                    if consumer != None:
                        if consumer.desiredState() == Consumer.State.Disabled:
                            # just remove it from memory
                            logging.info('[{}] Removing disabled trigger'.format(consumer.trigger))
                            self.consumers.removeConsumerForTrigger(consumer.trigger)
                        else:
                            logging.info('[{}] Shutting down running trigger'.format(consumer.trigger))
                            consumer.shutdown()
                # since we can't use a filter function for the feed (then
                # you don't get deletes) we need to manually verify this
                # is a valid trigger doc that has changed
                elif 'triggerURL' in change['doc']:
                    logging.info('[changes] Found a change in a trigger document')
                    document = change['doc']
                    triggerIsAssignedToMe = self.__isTriggerDocAssignedToMe(document)

                    if not self.consumers.hasConsumerForTrigger(change["id"]):
                        if triggerIsAssignedToMe:
                            logging.info('[{}] Found a new trigger to create'.format(change["id"]))
                            self.createAndRunConsumer(document)
                        else:
                            logging.info("[{}] Found a new trigger, but is assigned to another worker: {}".format(change["id"], document["worker"]))
                    else:
                        existingConsumer = self.consumers.getConsumerForTrigger(change["id"])

                        if existingConsumer.desiredState() == Consumer.State.Running and not self.__isTriggerDocActive(document):
                            # running trigger should become disabled
                            # this should be done regardless of which worker the document claims to be assigned to
                            logging.info('[{}] Existing running trigger should become disabled'.format(change["id"]))
                            existingConsumer.disable()
                        elif triggerIsAssignedToMe:
                            logging.info('[{}] Found a change to an existing trigger'.format(change["id"]))

                            if existingConsumer.desiredState() == Consumer.State.Dead and self.__isTriggerDocActive(document):
                                # if a delete occurs followed quickly by a create the consumer might get stuck in a dead state,
                                # so we need to forcefully delete the process before recreating it.
                                logging.info('[{}] A create event occurred for a trigger that is shutting down'.format(change["id"]))

                                if existingConsumer.process.is_alive():
                                    logging.info('[{}] Joining dead process.'.format(existingConsumer.trigger))
                                    existingConsumer.process.join(1)
                                else:
                                    logging.info('[{}] Process is already dead.'.format(existingConsumer.trigger))

                                self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
                                self.createAndRunConsumer(document)
                            elif existingConsumer.desiredState() == Consumer.State.Disabled and self.__isTriggerDocActive(document):
                                # disabled trigger has become active
                                logging.info('[{}] Existing disabled trigger should become active'.format(change["id"]))
                                self.createAndRunConsumer(document)
                        else:
                            # trigger has become reassigned to a different worker
                            logging.info("[{}] Shutting down trigger as it has been re-assigned to {}".format(change["id"], document["worker"]))
                            existingConsumer.shutdown()
                elif 'canary-timestamp' in change['doc']:
                    # found a canary - update lastCanaryTime
                    logging.info('[canary] I found a canary. The last one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
                    self.lastCanaryTime = datetime.now()
                else:
                    logging.debug('[changes] Found a change for a non-trigger document')

                retry = False
            except Exception as e:
                logging.error('[{}] Exception caught while handling change.'.format(change["id"]))
                logging.error(e)

                if retry:
                    retryCount += 1

                    if retryCount >= maxRetries:
                        logging.warn('[{}] Maximum number of retries exceeded for failed change.'.format(change["id"]))
                        retry = False