in decisionai_plugin/common/util/kafka_operator.py [0:0]
def consume_loop(process_func, topic, retry_limit=0, error_callback=None, config=None):
log.info(f"Start of consume_loop for topic {topic}...")
while True:
try:
kafka_configs = get_kafka_configs()
if config is not None:
kafka_configs.update(config)
consumer = KafkaConsumer(topic, **{**kafka_configs,
'group_id': 'job-controller-v2-%s' % topic,
'value_deserializer': lambda m: json.loads(m.decode('utf-8')),
'max_poll_records': 1,
'max_poll_interval_ms': 3600 * 6 * 1000,
'enable_auto_commit': False,
})
try:
for message in consumer:
# log.info("Received message: %s" % str(message))
try:
log.duration("message_latency", time.time() - message.timestamp / 1000, topic=topic,
partition=message.partition)
log.count("read_from_kafka", 1, topic=topic, partition=message.partition)
process_func(message.value)
consumer.commit()
except Exception as e:
count = message.value.get('__RETRY__', 0)
if count >= retry_limit:
log.error("Exceed the maximum number of retries.")
if error_callback:
error_callback(message, e)
append_to_failed_queue(message, e)
else:
log.error("Processing message failed, will retry. Error message: " + str(
e) + traceback.format_exc())
message.value['__RETRY__'] = count + 1
send_message(message.topic, message.value)
finally:
consumer.close()
except Exception as e:
log.error(f"Error in consume_loop for topic {topic}. " + traceback.format_exc())
time.sleep(10)