def get_kafka_configs()

in decisionai_plugin/common/util/kafka_operator_confluent.py [0:0]


def get_kafka_configs():
    if IS_MT or not IS_INTERNAL:
        if EVENTHUB_USE_MI:
            auth_helper = ManagedIdentityAuthHelper(AZURE_ENVIRONMENT, KAFKA_BOOTSTRAP_SERVERS.split(","))
            kafka_configs = {
                "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
                "security.protocol": "SASL_SSL",
                "sasl.mechanism": "OAUTHBEARER",
                "oauth_cb": functools.partial(oauth_cb, auth_helper),
            }
        else:
            sasl_password = os.environ['KAFKA_CONN_STRING']
            kafka_configs = {
                "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
                "security.protocol": "SASL_SSL",
                "sasl.mechanism": "PLAIN",
                "sasl.username": "$ConnectionString",
                "sasl.password": sasl_password
            }
    else:
        kafka_configs = {"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}
    return kafka_configs