def _build_cluster_args()

in awswrangler/emr.py [0:0]


def _build_cluster_args(**pars: Any) -> dict[str, Any]:  # noqa: PLR0912,PLR0915
    account_id: str = sts.get_account_id(boto3_session=pars["boto3_session"])
    region: str = _utils.get_region_from_session(boto3_session=pars["boto3_session"])

    # S3 Logging path
    if pars.get("logging_s3_path") is None:
        pars["logging_s3_path"] = _get_default_logging_path(
            subnet_id=None, account_id=account_id, region=region, boto3_session=pars["boto3_session"]
        )

    spark_env: dict[str, str] | None = None
    yarn_env: dict[str, str] | None = None
    livy_env: dict[str, str] | None = None

    if pars["spark_pyarrow"] is True:
        if pars["spark_defaults"] is None:
            pars["spark_defaults"] = {"spark.sql.execution.arrow.enabled": "true"}
        else:
            pars["spark_defaults"]["spark.sql.execution.arrow.enabled"] = "true"
        spark_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
        yarn_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
        livy_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}

    if pars["python3"] is True:
        if spark_env is None:
            spark_env = {"PYSPARK_PYTHON": "/usr/bin/python3"}
        else:
            spark_env["PYSPARK_PYTHON"] = "/usr/bin/python3"

    if pars["spark_jars_path"] is not None:
        paths: str = ",".join(pars["spark_jars_path"])
        if pars["spark_defaults"] is None:
            pars["spark_defaults"] = {"spark.jars": paths}
        else:
            pars["spark_defaults"]["spark.jars"] = paths

    args: dict[str, Any] = {
        "Name": pars["cluster_name"],
        "LogUri": pars["logging_s3_path"],
        "ReleaseLabel": pars["emr_release"],
        "VisibleToAllUsers": pars["visible_to_all_users"],
        "JobFlowRole": pars["emr_ec2_role"],
        "ServiceRole": pars["emr_role"],
        "Instances": {
            "KeepJobFlowAliveWhenNoSteps": pars["keep_cluster_alive_when_no_steps"],
            "TerminationProtected": pars["termination_protected"],
            "Ec2SubnetId": pars["subnet_id"],
            "InstanceFleets": [],
        },
        "StepConcurrencyLevel": pars["step_concurrency_level"],
    }

    # Auto Termination Policy
    if pars["auto_termination_policy"] is not None:
        args["AutoTerminationPolicy"] = pars["auto_termination_policy"]

    # Custom AMI
    if pars["custom_ami_id"] is not None:
        args["CustomAmiId"] = pars["custom_ami_id"]

    # EC2 Key Pair
    if pars["key_pair_name"] is not None:
        args["Instances"]["Ec2KeyName"] = pars["key_pair_name"]

    # Security groups
    if pars["security_group_master"] is not None:
        args["Instances"]["EmrManagedMasterSecurityGroup"] = pars["security_group_master"]
    if pars["security_groups_master_additional"] is not None:
        args["Instances"]["AdditionalMasterSecurityGroups"] = pars["security_groups_master_additional"]
    if pars["security_group_slave"] is not None:
        args["Instances"]["EmrManagedSlaveSecurityGroup"] = pars["security_group_slave"]
    if pars["security_groups_slave_additional"] is not None:
        args["Instances"]["AdditionalSlaveSecurityGroups"] = pars["security_groups_slave_additional"]
    if pars["security_group_service_access"] is not None:
        args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"]

    # Configurations
    args["Configurations"] = (
        [
            {
                "Classification": _get_emr_classification_lib(pars["emr_release"]),
                "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"},
            }
        ]
        if not pars["configurations"]
        else pars["configurations"]
    )
    if pars["docker"] is True:
        if pars.get("extra_public_registries") is None:
            extra_public_registries: list[str] = []
        else:
            extra_public_registries = pars["extra_public_registries"]
        registries: str = (
            f"local,centos,{account_id}.dkr.ecr.{region}.amazonaws.com,{','.join(extra_public_registries)}"
        )
        registries = registries[:-1] if registries.endswith(",") else registries
        args["Configurations"].append(
            {
                "Classification": "container-executor",
                "Properties": {},
                "Configurations": [
                    {
                        "Classification": "docker",
                        "Properties": {
                            "docker.privileged-containers.registries": registries,
                            "docker.trusted.registries": registries,
                        },
                        "Configurations": [],
                    }
                ],
            }
        )

    if spark_env is not None:
        args["Configurations"].append(
            {
                "Classification": "spark-env",
                "Properties": {},
                "Configurations": [{"Classification": "export", "Properties": spark_env, "Configurations": []}],
            }
        )
    if yarn_env is not None:
        args["Configurations"].append(
            {
                "Classification": "yarn-env",
                "Properties": {},
                "Configurations": [{"Classification": "export", "Properties": yarn_env, "Configurations": []}],
            }
        )
    if livy_env is not None:
        args["Configurations"].append(
            {
                "Classification": "livy-env",
                "Properties": {},
                "Configurations": [{"Classification": "export", "Properties": livy_env, "Configurations": []}],
            }
        )
    if pars["spark_glue_catalog"] is True:
        args["Configurations"].append(
            {
                "Classification": "spark-hive-site",
                "Properties": {
                    "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
                },
                "Configurations": [],
            }
        )
    if pars["hive_glue_catalog"] is True:
        hive_conf: dict[str, Any] = {"Classification": "hive-site", "Properties": {}, "Configurations": []}
        hive_conf["Properties"]["hive.metastore.client.factory.class"] = (
            "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        )
        args["Configurations"].append(hive_conf)
    if pars["presto_glue_catalog"] is True:
        args["Configurations"].append(
            {
                "Classification": "presto-connector-hive",
                "Properties": {"hive.metastore.glue.datacatalog.enabled": "true"},
                "Configurations": [],
            }
        )
    if pars["consistent_view"] is True:
        args["Configurations"].append(
            {
                "Classification": "emrfs-site",
                "Properties": {
                    "fs.s3.consistent.retryPeriodSeconds": str(pars.get("consistent_view_retry_seconds", "10")),
                    "fs.s3.consistent": "true",
                    "fs.s3.consistent.retryCount": str(pars.get("consistent_view_retry_count", "5")),
                    "fs.s3.consistent.metadata.tableName": pars.get("consistent_view_table_name", "EmrFSMetadata"),
                },
            }
        )
    if pars["maximize_resource_allocation"] is True:
        args["Configurations"].append({"Classification": "spark", "Properties": {"maximizeResourceAllocation": "true"}})
    if pars["spark_defaults"] is not None:
        spark_defaults: dict[str, str | dict[str, str]] = {
            "Classification": "spark-defaults",
            "Properties": pars["spark_defaults"],
        }
        args["Configurations"].append(spark_defaults)
    if pars.get("custom_classifications") is not None:
        for c in pars["custom_classifications"]:
            args["Configurations"].append(c)

    # Applications
    if pars["applications"]:
        args["Applications"] = [{"Name": x} for x in pars["applications"]]

    # Bootstraps
    if pars["bootstraps_paths"]:
        args["BootstrapActions"] = [{"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"]]

    # Debugging and Steps
    if (pars["debugging"] is True) or (pars["steps"] is not None):
        args["Steps"] = []
        if pars["debugging"] is True:
            args["Steps"].append(
                {
                    "Name": "Setup Hadoop Debugging",
                    "ActionOnFailure": "TERMINATE_CLUSTER",
                    "HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]},
                }
            )
        if pars["steps"] is not None:
            args["Steps"] += pars["steps"]

    # Master Instance Fleet
    timeout_action_master: str = (
        "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_master"] else "TERMINATE_CLUSTER"
    )
    fleet_master: dict[str, Any] = {
        "Name": "MASTER",
        "InstanceFleetType": "MASTER",
        "TargetOnDemandCapacity": pars["instance_num_on_demand_master"],
        "TargetSpotCapacity": pars["instance_num_spot_master"],
        "InstanceTypeConfigs": [
            {
                "InstanceType": pars["instance_type_master"],
                "WeightedCapacity": 1,
                "BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_master"],
                "EbsConfiguration": {
                    "EbsBlockDeviceConfigs": [
                        {
                            "VolumeSpecification": {"SizeInGB": pars["instance_ebs_size_master"], "VolumeType": "gp2"},
                            "VolumesPerInstance": 1,
                        }
                    ],
                    "EbsOptimized": True,
                },
            }
        ],
    }
    if pars["instance_num_spot_master"] > 0:
        fleet_master["LaunchSpecifications"] = {
            "SpotSpecification": {
                "TimeoutDurationMinutes": pars["spot_provisioning_timeout_master"],
                "TimeoutAction": timeout_action_master,
            }
        }
    args["Instances"]["InstanceFleets"].append(fleet_master)

    # Core Instance Fleet
    if (pars["instance_num_spot_core"] > 0) or pars["instance_num_on_demand_core"] > 0:
        timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER"
        fleet_core: dict[str, Any] = {
            "Name": "CORE",
            "InstanceFleetType": "CORE",
            "TargetOnDemandCapacity": pars["instance_num_on_demand_core"],
            "TargetSpotCapacity": pars["instance_num_spot_core"],
            "InstanceTypeConfigs": [
                {
                    "InstanceType": pars["instance_type_core"],
                    "WeightedCapacity": 1,
                    "BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_core"],
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": pars["instance_ebs_size_core"],
                                    "VolumeType": "gp2",
                                },
                                "VolumesPerInstance": 1,
                            }
                        ],
                        "EbsOptimized": True,
                    },
                }
            ],
        }
        if pars["instance_num_spot_core"] > 0:
            fleet_core["LaunchSpecifications"] = {
                "SpotSpecification": {
                    "TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"],
                    "TimeoutAction": timeout_action_core,
                }
            }
        args["Instances"]["InstanceFleets"].append(fleet_core)

    # Task Instance Fleet
    if (pars["instance_num_spot_task"] > 0) or pars["instance_num_on_demand_task"] > 0:
        timeout_action_task: str = (
            "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_task"] else "TERMINATE_CLUSTER"
        )
        fleet_task: dict[str, Any] = {
            "Name": "TASK",
            "InstanceFleetType": "TASK",
            "TargetOnDemandCapacity": pars["instance_num_on_demand_task"],
            "TargetSpotCapacity": pars["instance_num_spot_task"],
            "InstanceTypeConfigs": [
                {
                    "InstanceType": pars["instance_type_task"],
                    "WeightedCapacity": 1,
                    "BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_task"],
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": pars["instance_ebs_size_task"],
                                    "VolumeType": "gp2",
                                },
                                "VolumesPerInstance": 1,
                            }
                        ],
                        "EbsOptimized": True,
                    },
                }
            ],
        }
        if pars["instance_num_spot_task"] > 0:
            fleet_task["LaunchSpecifications"] = {
                "SpotSpecification": {
                    "TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"],
                    "TimeoutAction": timeout_action_task,
                }
            }
        args["Instances"]["InstanceFleets"].append(fleet_task)

    if pars["security_configuration"]:
        args["SecurityConfiguration"] = pars["security_configuration"]

    # Tags
    if pars["tags"] is not None:
        args["Tags"] = [{"Key": k, "Value": v} for k, v in pars["tags"].items()]

    _logger.debug("args: \n%s", pprint.pformat(args))
    return args