def run()

in a2d2/src/data_service.py [0:0]


    def run(self):
        try:
            topic = self.config["kafka_topic"]

            client_id = random_string()
            consumer = KafkaConsumer(topic, 
                        bootstrap_servers=self.config["servers"],
                        client_id=client_id,
                        group_id="a2d2-data-service")

            self.logger.info("running data service: {0}:{1}".format(client_id, topic))

            tasks = []
            max_tasks = int(self.config["max_response_tasks"])

            for message in consumer:
                try:
                    json_msg = json.loads(message.value) 
                    request = json_msg["request"]
                    self.logger.info("recvd request: {0}".format(request))
                    validate_data_request(request)
                    t = DataResponse(dbconfig=self.config['database'], 
                        servers=self.config["servers"], 
                        request=request, 
                        data_store=self.config['data_store'],
                        calibration=self.config['calibration'])

                    if len(tasks) < max_tasks:
                        tasks.append(t)
                    else:
                        oldest=tasks.pop(0)
                        oldest.join()
                        tasks.append(t)

                    assert(len(tasks) <= max_tasks)
                    t.start()

                except Exception as e:
                    exc_type, exc_value, exc_traceback = sys.exc_info()
                    traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
                    self.logger.error(str(e))


        except Exception as e:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
            self.logger.error(str(exc_type))
            self.logger.error(str(exc_value))