in helpers/functions.py [0:0]
def update_kafka_configuration(config_file):
""" ensure the configuration has auto enable topic """
# check if config exists
try:
config_arn = [
config
for config in kafkaclient.list_configurations()["Configurations"]
if config["Name"] == constants["PROJECT_TAG"]
][0]["Arn"]
except IndexError as err:
# create the config if it does not exist
config_arn = kafkaclient.create_configuration(
Description="Elkk Configuration",
KafkaVersions=[constants["KAFKA_VERSION"]],
Name=constants["PROJECT_TAG"],
ServerProperties=Path("kafka/configuration.txt").read_text(),
)["Arn"]
try:
# check the config arn attached to the cluster
kafka_config_arn = kafkaclient.describe_cluster(ClusterArn=kafka_get_arn())[
"ClusterInfo"
]["CurrentBrokerSoftwareInfo"]["ConfigurationArn"]
except KeyError as err:
# if not found then must be using default, get cluster version
kafka_cluster_version = kafkaclient.describe_cluster(
ClusterArn=kafka_get_arn()
)["ClusterInfo"]["CurrentVersion"]
# update cluster with configuration
return kafka_get_arn()
# park this for now
try:
kafkaclient.update_cluster_configuration(
ClusterArn=kafka_get_arn(),
ConfigurationInfo={"Arn": kafka_config_arn, "Revision": 1},
CurrentVersion=kafka_cluster_version,
)
except ClientError as err:
if err.response["Error"]["Code"] == "BadRequestException":
pass
else:
print(f"Unexpectedd error: {err}")
return kafka_get_arn()