def send_message()

in decisionai_plugin/common/util/kafka_operator_confluent.py [0:0]


def send_message(topic, message):
    global producer
    kafka_configs = get_kafka_configs()
    if producer is None:
        producer = Producer(**{**kafka_configs,
                                    'retries': 5
                                    })
    try:
        producer.produce(topic, json.dumps(message).encode('utf-8'))
        producer.flush()
        log.count("write_to_kafka", 1,  topic=topic, result='Success')
    except Exception as e:
        producer = None
        log.count("write_to_kafka", 1,  topic=topic, result='Failed')
        log.error("Produce message failed. Error message: " + str(e))