in a2d2/src/manifest_consumer.py [0:0]
def run(self):
try:
consumer = KafkaConsumer(self.response_topic,
bootstrap_servers=self.servers,
auto_offset_reset="earliest",
client_id=random_string())
self.logger.info("manifest consumer on {0} kafka topic".format(self.response_topic))
for message in consumer:
try:
json_str = message.value
json_msg = json.loads(json_str)
if is_close_msg(json_msg):
print(json_str)
break
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
self.logger.error(str(exc_type))
self.logger.error(str(exc_value))
break
consumer.close()
admin = KafkaAdminClient(bootstrap_servers=self.servers)
admin.delete_topics([self.response_topic])
admin.close()
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
self.logger.error(str(exc_type))
self.logger.error(str(exc_value))