in provider/thedoctor.py [0:0]
def run(self):
logging.info('[Doctor] The Doctor is in!')
while True:
try:
consumers = self.consumerCollection.getCopyForRead()
for consumerId in consumers:
consumer = consumers[consumerId]
logging.debug('[Doctor] [{}] Consumer is in state: {}'.format(consumerId, consumer.currentState()))
if consumer.currentState() == Consumer.State.Dead and consumer.desiredState() == Consumer.State.Running:
# well this is unexpected...
logging.error('[Doctor][{}] Consumer is dead, but should be alive!'.format(consumerId))
consumer.restart()
elif consumer.currentState() == Consumer.State.Dead and consumer.desiredState() == Consumer.State.Dead:
# Bring out yer dead...
if consumer.process.is_alive():
logging.info('[{}] Joining dead process.'.format(consumer.trigger))
# if you don't first join the process, it'll be left hanging around as a "defunct" process
consumer.process.join(1)
else:
logging.info('[{}] Process is already dead.'.format(consumer.trigger))
logging.info('[{}] Removing dead consumer from the collection.'.format(consumer.trigger))
self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
elif consumer.secondsSinceLastPoll() > self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running:
# there seems to be an issue with the kafka-python client where it gets into an
# error-handling loop. This causes poll() to never complete, but also does not
# throw an exception.
logging.error('[Doctor][{}] Consumer timed-out, but should be alive! Restarting consumer.'.format(consumerId))
consumer.restart()
time.sleep(self.sleepy_time_seconds)
except Exception as e:
logging.error("[Doctor] Uncaught exception: {}".format(e))