def update_kafka_configuration()

in helpers/functions.py [0:0]


def update_kafka_configuration(config_file):
    """ ensure the configuration has auto enable topic """
    # check if config exists
    try:
        config_arn = [
            config
            for config in kafkaclient.list_configurations()["Configurations"]
            if config["Name"] == constants["PROJECT_TAG"]
        ][0]["Arn"]
    except IndexError as err:
        # create the config if it does not exist
        config_arn = kafkaclient.create_configuration(
            Description="Elkk Configuration",
            KafkaVersions=[constants["KAFKA_VERSION"]],
            Name=constants["PROJECT_TAG"],
            ServerProperties=Path("kafka/configuration.txt").read_text(),
        )["Arn"]
    try:
        # check the config arn attached to the cluster
        kafka_config_arn = kafkaclient.describe_cluster(ClusterArn=kafka_get_arn())[
            "ClusterInfo"
        ]["CurrentBrokerSoftwareInfo"]["ConfigurationArn"]
    except KeyError as err:
        # if not found then must be using default, get cluster version
        kafka_cluster_version = kafkaclient.describe_cluster(
            ClusterArn=kafka_get_arn()
        )["ClusterInfo"]["CurrentVersion"]
        # update cluster with configuration
        return kafka_get_arn()
        # park this for now
        try:
            kafkaclient.update_cluster_configuration(
                ClusterArn=kafka_get_arn(),
                ConfigurationInfo={"Arn": kafka_config_arn, "Revision": 1},
                CurrentVersion=kafka_cluster_version,
            )
        except ClientError as err:
            if err.response["Error"]["Code"] == "BadRequestException":
                pass
            else:
                print(f"Unexpectedd error: {err}")
    return kafka_get_arn()