in a2d2/src/data_service.py [0:0]
def run(self):
try:
topic = self.config["kafka_topic"]
client_id = random_string()
consumer = KafkaConsumer(topic,
bootstrap_servers=self.config["servers"],
client_id=client_id,
group_id="a2d2-data-service")
self.logger.info("running data service: {0}:{1}".format(client_id, topic))
tasks = []
max_tasks = int(self.config["max_response_tasks"])
for message in consumer:
try:
json_msg = json.loads(message.value)
request = json_msg["request"]
self.logger.info("recvd request: {0}".format(request))
validate_data_request(request)
t = DataResponse(dbconfig=self.config['database'],
servers=self.config["servers"],
request=request,
data_store=self.config['data_store'],
calibration=self.config['calibration'])
if len(tasks) < max_tasks:
tasks.append(t)
else:
oldest=tasks.pop(0)
oldest.join()
tasks.append(t)
assert(len(tasks) <= max_tasks)
t.start()
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
self.logger.error(str(e))
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
self.logger.error(str(exc_type))
self.logger.error(str(exc_value))