in decisionai_plugin/common/util/kafka_operator_confluent.py [0:0]
def get_kafka_configs():
if IS_MT or not IS_INTERNAL:
if EVENTHUB_USE_MI:
auth_helper = ManagedIdentityAuthHelper(AZURE_ENVIRONMENT, KAFKA_BOOTSTRAP_SERVERS.split(","))
kafka_configs = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": functools.partial(oauth_cb, auth_helper),
}
else:
sasl_password = os.environ['KAFKA_CONN_STRING']
kafka_configs = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "$ConnectionString",
"sasl.password": sasl_password
}
else:
kafka_configs = {"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}
return kafka_configs