decisionai_plugin/common/util/kafka_operator_confluent.py (132 lines of code) (raw):
import functools
import os
from confluent_kafka import Consumer, Producer, KafkaException
from telemetry import log
import time
import traceback
from .configuration import Configuration, get_config_as_str
from .constant import IS_INTERNAL, IS_MT, EVENTHUB_USE_MI, AZURE_ENVIRONMENT
from .managedidentityauthhelper import ManagedIdentityAuthHelper
import json
producer=None
# kafka topics
DeadLetterTopicFormat = "{base_topic}-dl"
# get config info
def _get_endpoint_with_pattern(name):
config_dir = os.environ['KENSHO2_CONFIG_DIR']
endpoints = Configuration(config_dir + 'endpoints.ini')
kvs = endpoints[name]
if 'endpoint' in kvs:
return kvs['endpoint']
elif 'endpoint-pattern' in kvs:
num_replicas = int(get_config_as_str(('%ssystem/%s/replicas' % (config_dir, name)).strip()))
pattern = kvs['endpoint-pattern']
val = ','.join(map(lambda x: pattern % (x), range(num_replicas)))
print("kafka-endpoint: " + val)
return val
else:
raise Exception('missing endpoint for %s' % (name))
KAFKA_BOOTSTRAP_SERVERS = _get_endpoint_with_pattern('kafka') if IS_INTERNAL else os.environ['KAFKA_ENDPOINT']
def oauth_cb(auth_helper, config):
return auth_helper.token()
def get_kafka_configs():
if IS_MT or not IS_INTERNAL:
if EVENTHUB_USE_MI:
auth_helper = ManagedIdentityAuthHelper(AZURE_ENVIRONMENT, KAFKA_BOOTSTRAP_SERVERS.split(","))
kafka_configs = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": functools.partial(oauth_cb, auth_helper),
}
else:
sasl_password = os.environ['KAFKA_CONN_STRING']
kafka_configs = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "$ConnectionString",
"sasl.password": sasl_password
}
else:
kafka_configs = {"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}
return kafka_configs
def send_message(topic, message):
global producer
kafka_configs = get_kafka_configs()
if producer is None:
producer = Producer(**{**kafka_configs,
'retries': 5
})
try:
producer.produce(topic, json.dumps(message).encode('utf-8'))
producer.flush()
log.count("write_to_kafka", 1, topic=topic, result='Success')
except Exception as e:
producer = None
log.count("write_to_kafka", 1, topic=topic, result='Failed')
log.error("Produce message failed. Error message: " + str(e))
def append_to_failed_queue(message, err):
record_value = json.loads(message.value().decode('utf-8'))
errors = record_value.get('__ERROR__', [])
errors.append(str(err))
record_value['__ERROR__'] = errors
return send_message(DeadLetterTopicFormat.format(base_topic=message.topic), record_value)
def consume_loop(process_func, topic, retry_limit=0, error_callback=None, config=None):
log.info(f"Start of consume_loop for topic {topic}...")
while True:
try:
kafka_configs = get_kafka_configs()
if config is not None:
kafka_configs.update(config)
consumer_configs = {
**kafka_configs,
'group.id': 'job-controller-v2-%s' % topic,
'max.poll.interval.ms': 3600 * 6 * 1000,
'enable.auto.commit': False
}
consumer = Consumer(consumer_configs)
def print_assignment(consumer, partitions):
log.info('Assignment:', partitions)
consumer.subscribe([topic], on_assign=print_assignment)
try:
while True:
message = consumer.poll(timeout=1.0)
if message is None:
continue
if message.error():
raise KafkaException(message.error())
else:
# log.info("Received message: %s" % str(message))
log.count("read_from_kafka", 1, topic=topic)
log.duration("read_from_kafka", 1, topic=topic)
try:
record_value = json.loads(message.value().decode('utf-8'))
process_func(record_value)
consumer.commit()
except Exception as e:
count = record_value.get('__RETRY__', 0)
if count >= retry_limit:
log.error("Exceed the maximum number of retries.")
if error_callback:
error_callback(message, e)
append_to_failed_queue(message, e)
else:
log.error("Processing message failed, will retry. Error message: " + str(e) + traceback.format_exc())
record_value['__RETRY__'] = count + 1
send_message(message.topic, record_value)
finally:
consumer.close()
except Exception as e:
log.error(f"Error in consume_loop for topic {topic}. " + traceback.format_exc())
time.sleep(10)
if __name__ == "__main__":
sample_msg = {
"JobId": "",
"Mode": "train",
"TsName": "test-02",
"AlgorithmName": "test-02",
"InputPath": "",
"OutputPath": ""
}
print("sending message...")
for i in range(100):
print(i)
send_message('chuwan-test-topic', {"dataObjectID": i})