def send_message()

in decisionai_plugin/common/util/kafka_operator.py [0:0]


def send_message(topic, message, err_callback=None, retry=3):
    global producer

    # keep consistency
    retry -= 1

    while True:
        try:
            if producer is None:
                kafka_configs = get_kafka_configs()
                producer = KafkaProducer(**{**kafka_configs,
                                            'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
                                            'retries': 5,
                                            "partitioner": RoundRobinPartitioner()
                                            })
            if err_callback is not None:
                producer.send(topic, message).add_errback(err_callback, message)
            else:
                future = producer.send(topic, message)
                producer.flush()
                # wait 10 seconds for kafka writing completed!
                future.get(10)
            log.count("write_to_kafka", 1, topic=topic, result='Success')
            break
        except Exception as e:
            if producer is not None:
                producer.close()
                producer = None
            if isinstance(e, KafkaTimeoutError) and retry:
                retry -= 1
                log.info("Kafka producer retries.")
                continue
            else:
                log.count("write_to_kafka", 1, topic=topic, result='Failed')
                log.error(f"Kafka producer send failed. Error: {str(e)}")
                raise e