in decisionai_plugin/common/util/kafka_operator_confluent.py [0:0]
def consume_loop(process_func, topic, retry_limit=0, error_callback=None, config=None):
log.info(f"Start of consume_loop for topic {topic}...")
while True:
try:
kafka_configs = get_kafka_configs()
if config is not None:
kafka_configs.update(config)
consumer_configs = {
**kafka_configs,
'group.id': 'job-controller-v2-%s' % topic,
'max.poll.interval.ms': 3600 * 6 * 1000,
'enable.auto.commit': False
}
consumer = Consumer(consumer_configs)
def print_assignment(consumer, partitions):
log.info('Assignment:', partitions)
consumer.subscribe([topic], on_assign=print_assignment)
try:
while True:
message = consumer.poll(timeout=1.0)
if message is None:
continue
if message.error():
raise KafkaException(message.error())
else:
# log.info("Received message: %s" % str(message))
log.count("read_from_kafka", 1, topic=topic)
log.duration("read_from_kafka", 1, topic=topic)
try:
record_value = json.loads(message.value().decode('utf-8'))
process_func(record_value)
consumer.commit()
except Exception as e:
count = record_value.get('__RETRY__', 0)
if count >= retry_limit:
log.error("Exceed the maximum number of retries.")
if error_callback:
error_callback(message, e)
append_to_failed_queue(message, e)
else:
log.error("Processing message failed, will retry. Error message: " + str(e) + traceback.format_exc())
record_value['__RETRY__'] = count + 1
send_message(message.topic, record_value)
finally:
consumer.close()
except Exception as e:
log.error(f"Error in consume_loop for topic {topic}. " + traceback.format_exc())
time.sleep(10)