def consume_loop()

in decisionai_plugin/common/util/kafka_operator_confluent.py [0:0]


def consume_loop(process_func, topic, retry_limit=0, error_callback=None, config=None):
    log.info(f"Start of consume_loop for topic {topic}...")
    while True:
        try:
            kafka_configs = get_kafka_configs()
            if config is not None:
                kafka_configs.update(config)

            consumer_configs = {
                **kafka_configs,
                'group.id': 'job-controller-v2-%s' % topic,
                'max.poll.interval.ms': 3600 * 6 * 1000,
                'enable.auto.commit': False
            }

            consumer = Consumer(consumer_configs)
            
            def print_assignment(consumer, partitions):
                log.info('Assignment:', partitions)
            
            consumer.subscribe([topic], on_assign=print_assignment)

            try:
                while True:
                    message = consumer.poll(timeout=1.0)
                    if message is None:
                        continue
                    if message.error():
                        raise KafkaException(message.error())
                    else:
                        # log.info("Received message: %s" % str(message))
                        log.count("read_from_kafka", 1,  topic=topic)
                        log.duration("read_from_kafka", 1,  topic=topic)
                        try:
                            record_value = json.loads(message.value().decode('utf-8'))
                            process_func(record_value)
                            consumer.commit()
                        except Exception as e:
                            count = record_value.get('__RETRY__', 0)
                            if count >= retry_limit:
                                log.error("Exceed the maximum number of retries.")
                                if error_callback:
                                    error_callback(message, e)
                                append_to_failed_queue(message, e)
                            else:
                                log.error("Processing message failed, will retry. Error message: " + str(e) + traceback.format_exc())
                                record_value['__RETRY__'] = count + 1
                                send_message(message.topic, record_value)
            finally:
                consumer.close()
        except Exception as e:
            log.error(f"Error in consume_loop for topic {topic}. " + traceback.format_exc())
            time.sleep(10)