in decisionai_plugin/common/util/kafka_operator.py [0:0]
def send_message(topic, message, err_callback=None, retry=3):
global producer
# keep consistency
retry -= 1
while True:
try:
if producer is None:
kafka_configs = get_kafka_configs()
producer = KafkaProducer(**{**kafka_configs,
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
'retries': 5,
"partitioner": RoundRobinPartitioner()
})
if err_callback is not None:
producer.send(topic, message).add_errback(err_callback, message)
else:
future = producer.send(topic, message)
producer.flush()
# wait 10 seconds for kafka writing completed!
future.get(10)
log.count("write_to_kafka", 1, topic=topic, result='Success')
break
except Exception as e:
if producer is not None:
producer.close()
producer = None
if isinstance(e, KafkaTimeoutError) and retry:
retry -= 1
log.info("Kafka producer retries.")
continue
else:
log.count("write_to_kafka", 1, topic=topic, result='Failed')
log.error(f"Kafka producer send failed. Error: {str(e)}")
raise e