def consume_loop()

in decisionai_plugin/common/util/kafka_operator.py [0:0]


def consume_loop(process_func, topic, retry_limit=0, error_callback=None, config=None):
    log.info(f"Start of consume_loop for topic {topic}...")
    while True:
        try:
            kafka_configs = get_kafka_configs()
            if config is not None:
                kafka_configs.update(config)
                
            consumer = KafkaConsumer(topic, **{**kafka_configs,
                                               'group_id': 'job-controller-v2-%s' % topic,
                                               'value_deserializer': lambda m: json.loads(m.decode('utf-8')),
                                               'max_poll_records': 1,
                                               'max_poll_interval_ms': 3600 * 6 * 1000,
                                               'enable_auto_commit': False,
                                               })
            try:
                for message in consumer:
                    # log.info("Received message: %s" % str(message))
                    try:
                        log.duration("message_latency", time.time() - message.timestamp / 1000, topic=topic,
                                     partition=message.partition)
                        log.count("read_from_kafka", 1, topic=topic, partition=message.partition)
                        process_func(message.value)
                        consumer.commit()
                    except Exception as e:
                        count = message.value.get('__RETRY__', 0)
                        if count >= retry_limit:
                            log.error("Exceed the maximum number of retries.")
                            if error_callback:
                                error_callback(message, e)
                            append_to_failed_queue(message, e)
                        else:
                            log.error("Processing message failed, will retry. Error message: " + str(
                                e) + traceback.format_exc())
                            message.value['__RETRY__'] = count + 1
                            send_message(message.topic, message.value)
            finally:
                consumer.close()
        except Exception as e:
            log.error(f"Error in consume_loop for topic {topic}. " + traceback.format_exc())
            time.sleep(10)