provider/thedoctor.py (36 lines of code) (raw):

"""TheDoctor class. /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ """ import logging import time from consumer import Consumer from threading import Thread class TheDoctor (Thread): # maximum time to allow a consumer to not successfully poll() before restarting # this value must be greater than the total amount of time a consumer might retry firing a trigger poll_timeout_seconds = 200 # interval between the Doctor making rounds sleepy_time_seconds = 2 def __init__(self, consumerCollection): Thread.__init__(self) self.daemon = True self.consumerCollection = consumerCollection def run(self): logging.info('[Doctor] The Doctor is in!') while True: try: consumers = self.consumerCollection.getCopyForRead() for consumerId in consumers: consumer = consumers[consumerId] logging.debug('[Doctor] [{}] Consumer is in state: {}'.format(consumerId, consumer.currentState())) if consumer.currentState() == Consumer.State.Dead and consumer.desiredState() == Consumer.State.Running: # well this is unexpected... logging.error('[Doctor][{}] Consumer is dead, but should be alive!'.format(consumerId)) consumer.restart() elif consumer.currentState() == Consumer.State.Dead and consumer.desiredState() == Consumer.State.Dead: # Bring out yer dead... if consumer.process.is_alive(): logging.info('[{}] Joining dead process.'.format(consumer.trigger)) # if you don't first join the process, it'll be left hanging around as a "defunct" process consumer.process.join(1) else: logging.info('[{}] Process is already dead.'.format(consumer.trigger)) logging.info('[{}] Removing dead consumer from the collection.'.format(consumer.trigger)) self.consumerCollection.removeConsumerForTrigger(consumer.trigger) elif consumer.secondsSinceLastPoll() > self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running: # there seems to be an issue with the kafka-python client where it gets into an # error-handling loop. This causes poll() to never complete, but also does not # throw an exception. logging.error('[Doctor][{}] Consumer timed-out, but should be alive! Restarting consumer.'.format(consumerId)) consumer.restart() time.sleep(self.sleepy_time_seconds) except Exception as e: logging.error("[Doctor] Uncaught exception: {}".format(e))