"""EMR (Elastic Map Reduce) module."""

from __future__ import annotations

import logging
import pprint
import re
from typing import Any, Literal, cast

import boto3

from awswrangler import _utils, exceptions, sts

_logger: logging.Logger = logging.getLogger(__name__)


_ActionOnFailureLiteral = Literal["TERMINATE_JOB_FLOW", "TERMINATE_CLUSTER", "CANCEL_AND_WAIT", "CONTINUE"]


def _get_ecr_credentials_refresh_content(region: str) -> str:
    return f"""
import subprocess
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ECR Setup Job").getOrCreate()

COMMANDS = [
    "sudo -s eval $(aws ecr get-login --region {region} --no-include-email)",
    "sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/"
]

for command in COMMANDS:
    subprocess.run(command.split(" "), timeout=6.0, check=True)

print("done!")
    """


def _get_default_logging_path(
    subnet_id: str | None = None,
    account_id: str | None = None,
    region: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> str:
    """Get EMR default logging path.

    E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/"

    Parameters
    ----------
    subnet_id : str, optional
        Subnet ID. If not provided, you must pass `account_id` and `region` explicit.
    account_id: str, optional
        Account ID.
    region: str, optional
        Region e.g. 'us-east-1'
    boto3_session : boto3.Session(), optional
        Boto3 Session. The default boto3 session will be used if boto3_session receive None.

    Returns
    -------
    str
        Default logging path.
        E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/"

    Examples
    --------
    >>> import awswrangler as wr
    >>> state = wr.emr._get_default_logging_path("subnet-id")
    's3://aws-logs-{account_id}-{region}/elasticmapreduce/'

    """
    if account_id is None:
        _account_id: str = sts.get_account_id(boto3_session=boto3_session)
    else:
        _account_id = account_id
    if (region is None) and (subnet_id is not None):
        _region: str = _utils.get_region_from_session(boto3_session=boto3_session)
    elif (region is None) and (subnet_id is None):
        raise exceptions.InvalidArgumentCombination("You must pass region or subnet_id or both.")
    else:
        _region = cast(str, region)
    return f"s3://aws-logs-{_account_id}-{_region}/elasticmapreduce/"


def _get_emr_classification_lib(emr_version: str) -> str:
    """Parse emr release string.

    Parse emr release string and return its corresponding Classification
    configuration string. i.e. log4j or log4j2.

    Parameters
    ----------
        emr_version: emr release string

    Returns
    -------
        A string mentioning the appropriate classification lib based on the emr release.
    """
    matches = re.findall(r"(\d.\d.\d)", emr_version)
    number = 670
    if matches:
        number = int(matches[0].replace(".", ""))

    return "spark-log4j2" if number > 670 else "spark-log4j"


def _build_cluster_args(**pars: Any) -> dict[str, Any]:  # noqa: PLR0912,PLR0915
    account_id: str = sts.get_account_id(boto3_session=pars["boto3_session"])
    region: str = _utils.get_region_from_session(boto3_session=pars["boto3_session"])

    # S3 Logging path
    if pars.get("logging_s3_path") is None:
        pars["logging_s3_path"] = _get_default_logging_path(
            subnet_id=None, account_id=account_id, region=region, boto3_session=pars["boto3_session"]
        )

    spark_env: dict[str, str] | None = None
    yarn_env: dict[str, str] | None = None
    livy_env: dict[str, str] | None = None

    if pars["spark_pyarrow"] is True:
        if pars["spark_defaults"] is None:
            pars["spark_defaults"] = {"spark.sql.execution.arrow.enabled": "true"}
        else:
            pars["spark_defaults"]["spark.sql.execution.arrow.enabled"] = "true"
        spark_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
        yarn_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
        livy_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}

    if pars["python3"] is True:
        if spark_env is None:
            spark_env = {"PYSPARK_PYTHON": "/usr/bin/python3"}
        else:
            spark_env["PYSPARK_PYTHON"] = "/usr/bin/python3"

    if pars["spark_jars_path"] is not None:
        paths: str = ",".join(pars["spark_jars_path"])
        if pars["spark_defaults"] is None:
            pars["spark_defaults"] = {"spark.jars": paths}
        else:
            pars["spark_defaults"]["spark.jars"] = paths

    args: dict[str, Any] = {
        "Name": pars["cluster_name"],
        "LogUri": pars["logging_s3_path"],
        "ReleaseLabel": pars["emr_release"],
        "VisibleToAllUsers": pars["visible_to_all_users"],
        "JobFlowRole": pars["emr_ec2_role"],
        "ServiceRole": pars["emr_role"],
        "Instances": {
            "KeepJobFlowAliveWhenNoSteps": pars["keep_cluster_alive_when_no_steps"],
            "TerminationProtected": pars["termination_protected"],
            "Ec2SubnetId": pars["subnet_id"],
            "InstanceFleets": [],
        },
        "StepConcurrencyLevel": pars["step_concurrency_level"],
    }

    # Auto Termination Policy
    if pars["auto_termination_policy"] is not None:
        args["AutoTerminationPolicy"] = pars["auto_termination_policy"]

    # Custom AMI
    if pars["custom_ami_id"] is not None:
        args["CustomAmiId"] = pars["custom_ami_id"]

    # EC2 Key Pair
    if pars["key_pair_name"] is not None:
        args["Instances"]["Ec2KeyName"] = pars["key_pair_name"]

    # Security groups
    if pars["security_group_master"] is not None:
        args["Instances"]["EmrManagedMasterSecurityGroup"] = pars["security_group_master"]
    if pars["security_groups_master_additional"] is not None:
        args["Instances"]["AdditionalMasterSecurityGroups"] = pars["security_groups_master_additional"]
    if pars["security_group_slave"] is not None:
        args["Instances"]["EmrManagedSlaveSecurityGroup"] = pars["security_group_slave"]
    if pars["security_groups_slave_additional"] is not None:
        args["Instances"]["AdditionalSlaveSecurityGroups"] = pars["security_groups_slave_additional"]
    if pars["security_group_service_access"] is not None:
        args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"]

    # Configurations
    args["Configurations"] = (
        [
            {
                "Classification": _get_emr_classification_lib(pars["emr_release"]),
                "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"},
            }
        ]
        if not pars["configurations"]
        else pars["configurations"]
    )
    if pars["docker"] is True:
        if pars.get("extra_public_registries") is None:
            extra_public_registries: list[str] = []
        else:
            extra_public_registries = pars["extra_public_registries"]
        registries: str = (
            f"local,centos,{account_id}.dkr.ecr.{region}.amazonaws.com,{','.join(extra_public_registries)}"
        )
        registries = registries[:-1] if registries.endswith(",") else registries
        args["Configurations"].append(
            {
                "Classification": "container-executor",
                "Properties": {},
                "Configurations": [
                    {
                        "Classification": "docker",
                        "Properties": {
                            "docker.privileged-containers.registries": registries,
                            "docker.trusted.registries": registries,
                        },
                        "Configurations": [],
                    }
                ],
            }
        )

    if spark_env is not None:
        args["Configurations"].append(
            {
                "Classification": "spark-env",
                "Properties": {},
                "Configurations": [{"Classification": "export", "Properties": spark_env, "Configurations": []}],
            }
        )
    if yarn_env is not None:
        args["Configurations"].append(
            {
                "Classification": "yarn-env",
                "Properties": {},
                "Configurations": [{"Classification": "export", "Properties": yarn_env, "Configurations": []}],
            }
        )
    if livy_env is not None:
        args["Configurations"].append(
            {
                "Classification": "livy-env",
                "Properties": {},
                "Configurations": [{"Classification": "export", "Properties": livy_env, "Configurations": []}],
            }
        )
    if pars["spark_glue_catalog"] is True:
        args["Configurations"].append(
            {
                "Classification": "spark-hive-site",
                "Properties": {
                    "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
                },
                "Configurations": [],
            }
        )
    if pars["hive_glue_catalog"] is True:
        hive_conf: dict[str, Any] = {"Classification": "hive-site", "Properties": {}, "Configurations": []}
        hive_conf["Properties"]["hive.metastore.client.factory.class"] = (
            "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        )
        args["Configurations"].append(hive_conf)
    if pars["presto_glue_catalog"] is True:
        args["Configurations"].append(
            {
                "Classification": "presto-connector-hive",
                "Properties": {"hive.metastore.glue.datacatalog.enabled": "true"},
                "Configurations": [],
            }
        )
    if pars["consistent_view"] is True:
        args["Configurations"].append(
            {
                "Classification": "emrfs-site",
                "Properties": {
                    "fs.s3.consistent.retryPeriodSeconds": str(pars.get("consistent_view_retry_seconds", "10")),
                    "fs.s3.consistent": "true",
                    "fs.s3.consistent.retryCount": str(pars.get("consistent_view_retry_count", "5")),
                    "fs.s3.consistent.metadata.tableName": pars.get("consistent_view_table_name", "EmrFSMetadata"),
                },
            }
        )
    if pars["maximize_resource_allocation"] is True:
        args["Configurations"].append({"Classification": "spark", "Properties": {"maximizeResourceAllocation": "true"}})
    if pars["spark_defaults"] is not None:
        spark_defaults: dict[str, str | dict[str, str]] = {
            "Classification": "spark-defaults",
            "Properties": pars["spark_defaults"],
        }
        args["Configurations"].append(spark_defaults)
    if pars.get("custom_classifications") is not None:
        for c in pars["custom_classifications"]:
            args["Configurations"].append(c)

    # Applications
    if pars["applications"]:
        args["Applications"] = [{"Name": x} for x in pars["applications"]]

    # Bootstraps
    if pars["bootstraps_paths"]:
        args["BootstrapActions"] = [{"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"]]

    # Debugging and Steps
    if (pars["debugging"] is True) or (pars["steps"] is not None):
        args["Steps"] = []
        if pars["debugging"] is True:
            args["Steps"].append(
                {
                    "Name": "Setup Hadoop Debugging",
                    "ActionOnFailure": "TERMINATE_CLUSTER",
                    "HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]},
                }
            )
        if pars["steps"] is not None:
            args["Steps"] += pars["steps"]

    # Master Instance Fleet
    timeout_action_master: str = (
        "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_master"] else "TERMINATE_CLUSTER"
    )
    fleet_master: dict[str, Any] = {
        "Name": "MASTER",
        "InstanceFleetType": "MASTER",
        "TargetOnDemandCapacity": pars["instance_num_on_demand_master"],
        "TargetSpotCapacity": pars["instance_num_spot_master"],
        "InstanceTypeConfigs": [
            {
                "InstanceType": pars["instance_type_master"],
                "WeightedCapacity": 1,
                "BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_master"],
                "EbsConfiguration": {
                    "EbsBlockDeviceConfigs": [
                        {
                            "VolumeSpecification": {"SizeInGB": pars["instance_ebs_size_master"], "VolumeType": "gp2"},
                            "VolumesPerInstance": 1,
                        }
                    ],
                    "EbsOptimized": True,
                },
            }
        ],
    }
    if pars["instance_num_spot_master"] > 0:
        fleet_master["LaunchSpecifications"] = {
            "SpotSpecification": {
                "TimeoutDurationMinutes": pars["spot_provisioning_timeout_master"],
                "TimeoutAction": timeout_action_master,
            }
        }
    args["Instances"]["InstanceFleets"].append(fleet_master)

    # Core Instance Fleet
    if (pars["instance_num_spot_core"] > 0) or pars["instance_num_on_demand_core"] > 0:
        timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER"
        fleet_core: dict[str, Any] = {
            "Name": "CORE",
            "InstanceFleetType": "CORE",
            "TargetOnDemandCapacity": pars["instance_num_on_demand_core"],
            "TargetSpotCapacity": pars["instance_num_spot_core"],
            "InstanceTypeConfigs": [
                {
                    "InstanceType": pars["instance_type_core"],
                    "WeightedCapacity": 1,
                    "BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_core"],
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": pars["instance_ebs_size_core"],
                                    "VolumeType": "gp2",
                                },
                                "VolumesPerInstance": 1,
                            }
                        ],
                        "EbsOptimized": True,
                    },
                }
            ],
        }
        if pars["instance_num_spot_core"] > 0:
            fleet_core["LaunchSpecifications"] = {
                "SpotSpecification": {
                    "TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"],
                    "TimeoutAction": timeout_action_core,
                }
            }
        args["Instances"]["InstanceFleets"].append(fleet_core)

    # Task Instance Fleet
    if (pars["instance_num_spot_task"] > 0) or pars["instance_num_on_demand_task"] > 0:
        timeout_action_task: str = (
            "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_task"] else "TERMINATE_CLUSTER"
        )
        fleet_task: dict[str, Any] = {
            "Name": "TASK",
            "InstanceFleetType": "TASK",
            "TargetOnDemandCapacity": pars["instance_num_on_demand_task"],
            "TargetSpotCapacity": pars["instance_num_spot_task"],
            "InstanceTypeConfigs": [
                {
                    "InstanceType": pars["instance_type_task"],
                    "WeightedCapacity": 1,
                    "BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_task"],
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": pars["instance_ebs_size_task"],
                                    "VolumeType": "gp2",
                                },
                                "VolumesPerInstance": 1,
                            }
                        ],
                        "EbsOptimized": True,
                    },
                }
            ],
        }
        if pars["instance_num_spot_task"] > 0:
            fleet_task["LaunchSpecifications"] = {
                "SpotSpecification": {
                    "TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"],
                    "TimeoutAction": timeout_action_task,
                }
            }
        args["Instances"]["InstanceFleets"].append(fleet_task)

    if pars["security_configuration"]:
        args["SecurityConfiguration"] = pars["security_configuration"]

    # Tags
    if pars["tags"] is not None:
        args["Tags"] = [{"Key": k, "Value": v} for k, v in pars["tags"].items()]

    _logger.debug("args: \n%s", pprint.pformat(args))
    return args


def create_cluster(  # noqa: PLR0913
    subnet_id: str,
    cluster_name: str = "my-emr-cluster",
    logging_s3_path: str | None = None,
    emr_release: str = "emr-6.7.0",
    emr_ec2_role: str = "EMR_EC2_DefaultRole",
    emr_role: str = "EMR_DefaultRole",
    instance_type_master: str = "r5.xlarge",
    instance_type_core: str = "r5.xlarge",
    instance_type_task: str = "r5.xlarge",
    instance_ebs_size_master: int = 64,
    instance_ebs_size_core: int = 64,
    instance_ebs_size_task: int = 64,
    instance_num_on_demand_master: int = 1,
    instance_num_on_demand_core: int = 0,
    instance_num_on_demand_task: int = 0,
    instance_num_spot_master: int = 0,
    instance_num_spot_core: int = 0,
    instance_num_spot_task: int = 0,
    spot_bid_percentage_of_on_demand_master: int = 100,
    spot_bid_percentage_of_on_demand_core: int = 100,
    spot_bid_percentage_of_on_demand_task: int = 100,
    spot_provisioning_timeout_master: int = 5,
    spot_provisioning_timeout_core: int = 5,
    spot_provisioning_timeout_task: int = 5,
    spot_timeout_to_on_demand_master: bool = True,
    spot_timeout_to_on_demand_core: bool = True,
    spot_timeout_to_on_demand_task: bool = True,
    python3: bool = True,
    spark_glue_catalog: bool = True,
    hive_glue_catalog: bool = True,
    presto_glue_catalog: bool = True,
    consistent_view: bool = False,
    consistent_view_retry_seconds: int = 10,
    consistent_view_retry_count: int = 5,
    consistent_view_table_name: str = "EmrFSMetadata",
    bootstraps_paths: list[str] | None = None,
    debugging: bool = True,
    applications: list[str] | None = None,
    visible_to_all_users: bool = True,
    key_pair_name: str | None = None,
    security_group_master: str | None = None,
    security_groups_master_additional: list[str] | None = None,
    security_group_slave: str | None = None,
    security_groups_slave_additional: list[str] | None = None,
    security_group_service_access: str | None = None,
    security_configuration: str | None = None,
    docker: bool = False,
    extra_public_registries: list[str] | None = None,
    spark_log_level: str = "WARN",
    spark_jars_path: list[str] | None = None,
    spark_defaults: dict[str, str] | None = None,
    spark_pyarrow: bool = False,
    custom_classifications: list[dict[str, Any]] | None = None,
    maximize_resource_allocation: bool = False,
    steps: list[dict[str, Any]] | None = None,
    custom_ami_id: str | None = None,
    step_concurrency_level: int = 1,
    keep_cluster_alive_when_no_steps: bool = True,
    termination_protected: bool = False,
    auto_termination_policy: dict[str, int] | None = None,
    tags: dict[str, str] | None = None,
    boto3_session: boto3.Session | None = None,
    configurations: list[dict[str, Any]] | None = None,
) -> str:
    """Create a EMR cluster with instance fleets configuration.

    https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html

    Parameters
    ----------
    subnet_id
        VPC subnet ID.
    cluster_name
        Cluster name.
    logging_s3_path
        Logging s3 path (e.g. s3://BUCKET_NAME/DIRECTORY_NAME/).
        If None, the default is `s3://aws-logs-{AccountId}-{RegionId}/elasticmapreduce/`
    emr_release
        EMR release (e.g. emr-5.28.0).
    emr_ec2_role
        IAM role name.
    emr_role
        IAM role name.
    instance_type_master
        EC2 instance type.
    instance_type_core
        EC2 instance type.
    instance_type_task
        EC2 instance type.
    instance_ebs_size_master
        Size of EBS in GB.
    instance_ebs_size_core
        Size of EBS in GB.
    instance_ebs_size_task
        Size of EBS in GB.
    instance_num_on_demand_master
        Number of on demand instances.
    instance_num_on_demand_core
        Number of on demand instances.
    instance_num_on_demand_task
        Number of on demand instances.
    instance_num_spot_master
        Number of spot instances.
    instance_num_spot_core
        Number of spot instances.
    instance_num_spot_task
        Number of spot instances.
    spot_bid_percentage_of_on_demand_master
        The bid price, as a percentage of On-Demand price.
    spot_bid_percentage_of_on_demand_core
        The bid price, as a percentage of On-Demand price.
    spot_bid_percentage_of_on_demand_task
        The bid price, as a percentage of On-Demand price.
    spot_provisioning_timeout_master
        The spot provisioning timeout period in minutes.
        If Spot instances are not provisioned within this time period,
        the TimeOutAction is taken. Minimum value is 5 and maximum value is 1440.
        The timeout applies only during initial provisioning,
        when the cluster is first created.
    spot_provisioning_timeout_core
        The spot provisioning timeout period in minutes.
        If Spot instances are not provisioned within this time period,
        the TimeOutAction is taken. Minimum value is 5 and maximum value is 1440.
        The timeout applies only during initial provisioning,
        when the cluster is first created.
    spot_provisioning_timeout_task
        The spot provisioning timeout period in minutes.
        If Spot instances are not provisioned within this time period,
        the TimeOutAction is taken. Minimum value is 5 and maximum value is 1440.
        The timeout applies only during initial provisioning,
        when the cluster is first created.
    spot_timeout_to_on_demand_master
        After a provisioning timeout should the cluster switch to
        on demand or shutdown?
    spot_timeout_to_on_demand_core
        After a provisioning timeout should the cluster switch to
        on demand or shutdown?
    spot_timeout_to_on_demand_task
        After a provisioning timeout should the cluster switch to
        on demand or shutdown?
    python3
        Python 3 Enabled?
    spark_glue_catalog
        Spark integration with Glue Catalog?
    hive_glue_catalog
        Hive integration with Glue Catalog?
    presto_glue_catalog
        Presto integration with Glue Catalog?
    consistent_view
        Consistent view allows EMR clusters to check for
        list and read-after-write consistency for
        Amazon S3 objects written by or synced with EMRFS.
        https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html
    consistent_view_retry_seconds
        Delay between the tries (seconds).
    consistent_view_retry_count
        Number of tries.
    consistent_view_table_name
        Name of the DynamoDB table to store the consistent view data.
    bootstraps_paths
        Bootstraps paths (e.g ["s3://BUCKET_NAME/script.sh"]).
    debugging
        Debugging enabled?
    applications
        List of applications (e.g ["Hadoop", "Spark", "Ganglia", "Hive"]).
        If None, ["Spark"] will be considered.
    visible_to_all_users
        True or False.
    key_pair_name
        Key pair name.
    security_group_master
        The identifier of the Amazon EC2 security group for the master node.
    security_groups_master_additional
        A list of additional Amazon EC2 security group IDs for the master node.
    security_group_slave
        The identifier of the Amazon EC2 security group for
        the core and task nodes.
    security_groups_slave_additional
        A list of additional Amazon EC2 security group IDs for
        the core and task nodes.
    security_group_service_access
        The identifier of the Amazon EC2 security group for the Amazon EMR
        service to access clusters in VPC private subnets.
    security_configuration:str, optional
        The name of a security configuration to apply to the cluster.
    docker
        Enable Docker Hub and ECR registries access.
    extra_public_registries
        Additional docker registries.
    spark_log_level
        log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE).
    spark_jars_path
        spark.jars e.g. [s3://.../foo.jar, s3://.../boo.jar]
        https://spark.apache.org/docs/latest/configuration.html
    spark_defaults
        https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#spark-defaults
    spark_pyarrow
        Enable PySpark to use PyArrow behind the scenes.
        P.S. You must install pyarrow by your self via bootstrap
    custom_classifications
        Extra classifications.
    maximize_resource_allocation
        Configure your executors to utilize the maximum resources possible
        https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
    custom_ami_id
        The custom AMI ID to use for the provisioned instance group
    steps
        Steps definitions (Obs : str Use EMR.build_step() to build it)
    keep_cluster_alive_when_no_steps
        Specifies whether the cluster should
        remain available after completing all steps
    termination_protected
        Specifies whether the Amazon EC2 instances in the cluster are
        protected from termination by API calls, user intervention,
        or in the event of a job-flow error.
    auto_termination_policy
        Specifies the auto-termination policy that is attached to an Amazon EMR cluster
        eg. auto_termination_policy = {'IdleTimeout': 123}
        IdleTimeout specifies the amount of idle time in seconds after which the cluster automatically terminates.
        You can specify a minimum of 60 seconds and a maximum of 604800 seconds (seven days).
    tags
        Key/Value collection to put on the Cluster.
        e.g. {"foo": "boo", "bar": "xoo"})
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.
    configurations
        The list of configurations supplied for an EMR cluster instance group.

        By default, adds log4j config as follows:
        `{"Classification": "spark-log4j", "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"}}`

    Returns
    -------
        Cluster ID.

    Examples
    --------
    Minimal Example

    >>> import awswrangler as wr
    >>> cluster_id = wr.emr.create_cluster("SUBNET_ID")

    Minimal Example With Custom Classification

    >>> import awswrangler as wr
    >>> cluster_id = wr.emr.create_cluster(
    >>> subnet_id="SUBNET_ID",
    >>> custom_classifications=[
    >>>         {
    >>>             "Classification": "livy-conf",
    >>>             "Properties": {
    >>>                 "livy.spark.master": "yarn",
    >>>                 "livy.spark.deploy-mode": "cluster",
    >>>                 "livy.server.session.timeout": "16h",
    >>>             },
    >>>         }
    >>>     ],
    >>> )

    Full Example

    >>> import awswrangler as wr
    >>> cluster_id = wr.emr.create_cluster(
    ...     cluster_name="wrangler_cluster",
    ...     logging_s3_path=f"s3://BUCKET_NAME/emr-logs/",
    ...     emr_release="emr-5.28.0",
    ...     subnet_id="SUBNET_ID",
    ...     emr_ec2_role="EMR_EC2_DefaultRole",
    ...     emr_role="EMR_DefaultRole",
    ...     instance_type_master="m5.xlarge",
    ...     instance_type_core="m5.xlarge",
    ...     instance_type_task="m5.xlarge",
    ...     instance_ebs_size_master=50,
    ...     instance_ebs_size_core=50,
    ...     instance_ebs_size_task=50,
    ...     instance_num_on_demand_master=1,
    ...     instance_num_on_demand_core=1,
    ...     instance_num_on_demand_task=1,
    ...     instance_num_spot_master=0,
    ...     instance_num_spot_core=1,
    ...     instance_num_spot_task=1,
    ...     spot_bid_percentage_of_on_demand_master=100,
    ...     spot_bid_percentage_of_on_demand_core=100,
    ...     spot_bid_percentage_of_on_demand_task=100,
    ...     spot_provisioning_timeout_master=5,
    ...     spot_provisioning_timeout_core=5,
    ...     spot_provisioning_timeout_task=5,
    ...     spot_timeout_to_on_demand_master=True,
    ...     spot_timeout_to_on_demand_core=True,
    ...     spot_timeout_to_on_demand_task=True,
    ...     python3=True,
    ...     spark_glue_catalog=True,
    ...     hive_glue_catalog=True,
    ...     presto_glue_catalog=True,
    ...     bootstraps_paths=None,
    ...     debugging=True,
    ...     applications=["Hadoop", "Spark", "Ganglia", "Hive"],
    ...     visible_to_all_users=True,
    ...     key_pair_name=None,
    ...     spark_jars_path=[f"s3://...jar"],
    ...     maximize_resource_allocation=True,
    ...     keep_cluster_alive_when_no_steps=True,
    ...     termination_protected=False,
    ...     spark_pyarrow=True,
    ...     tags={
    ...         "foo": "boo"
    ...     })

    """
    applications = ["Spark"] if applications is None else applications
    args: dict[str, Any] = _build_cluster_args(**locals())
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.run_job_flow(**args)
    _logger.debug("response: \n%s", pprint.pformat(response))
    return response["JobFlowId"]


def get_cluster_state(cluster_id: str, boto3_session: boto3.Session | None = None) -> str:
    """Get the EMR cluster state.

    Possible states: 'STARTING', 'BOOTSTRAPPING', 'RUNNING',
    'WAITING', 'TERMINATING',
    'TERMINATED', 'TERMINATED_WITH_ERRORS'

    Parameters
    ----------
    cluster_id
        Cluster ID.
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        State.

    Examples
    --------
    >>> import awswrangler as wr
    >>> state = wr.emr.get_cluster_state("cluster-id")

    """
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.describe_cluster(ClusterId=cluster_id)
    _logger.debug("response: \n%s", pprint.pformat(response))
    return response["Cluster"]["Status"]["State"]


def terminate_cluster(cluster_id: str, boto3_session: boto3.Session | None = None) -> None:
    """Terminate EMR cluster.

    Parameters
    ----------
    cluster_id
        Cluster ID.
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Examples
    --------
    >>> import awswrangler as wr
    >>> wr.emr.terminate_cluster("cluster-id")

    """
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.terminate_job_flows(JobFlowIds=[cluster_id])
    _logger.debug("response: \n%s", pprint.pformat(response))


def submit_steps(cluster_id: str, steps: list[dict[str, Any]], boto3_session: boto3.Session | None = None) -> list[str]:
    """Submit a list of steps.

    Parameters
    ----------
    cluster_id
        Cluster ID.
    steps
        Steps definitions (Obs: Use EMR.build_step() to build it).
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        List of step IDs.

    Examples
    --------
    >>> import awswrangler as wr
    >>> for cmd in ['echo "Hello"', "ls -la"]:
    ...     steps.append(wr.emr.build_step(name=cmd, command=cmd))
    >>> wr.emr.submit_steps(cluster_id="cluster-id", steps=steps)

    """
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps)  # type: ignore[arg-type]
    _logger.debug("response: \n%s", pprint.pformat(response))
    return response["StepIds"]


def submit_step(
    cluster_id: str,
    command: str,
    name: str = "my-step",
    action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
    script: bool = False,
    boto3_session: boto3.Session | None = None,
) -> str:
    """Submit new job in the EMR Cluster.

    Parameters
    ----------
    cluster_id
        Cluster ID.
    command
        e.g. 'echo "Hello!"'
        e.g. for script 's3://.../script.sh arg1 arg2'
    name
        Step name.
    action_on_failure
        'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
    script
        True for raw command or False for script runner.
        https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        Step ID.

    Examples
    --------
    >>> import awswrangler as wr
    >>> step_id = wr.emr.submit_step(
    ...     cluster_id=cluster_id,
    ...     name="step_test",
    ...     command="s3://...script.sh arg1 arg2",
    ...     script=True,
    ... )

    """
    step: dict[str, Any] = build_step(
        name=name, command=command, action_on_failure=action_on_failure, script=script, boto3_session=boto3_session
    )
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])  # type: ignore[list-item]
    _logger.debug("response: \n%s", pprint.pformat(response))
    return response["StepIds"][0]


def build_step(
    command: str,
    name: str = "my-step",
    action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
    script: bool = False,
    region: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, Any]:
    """Build the Step structure (dictionary).

    Parameters
    ----------
    command
        e.g. 'echo "Hello!"'
        e.g. for script 's3://.../script.sh arg1 arg2'
    name
        Step name.
    action_on_failure
        'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
    script
        False for raw command or True for script runner.
        https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html
    region
        Region name to not get it from boto3.Session. (e.g. `us-east-1`)
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        Step structure.

    Examples
    --------
    >>> import awswrangler as wr
    >>> steps = []
    >>> for cmd in ['echo "Hello"', "ls -la"]:
    ...     steps.append(wr.emr.build_step(name=cmd, command=cmd))
    >>> wr.emr.submit_steps(cluster_id="cluster-id", steps=steps)

    """
    jar: str = "command-runner.jar"
    if script is True:
        if region is not None:
            _region: str = region
        else:
            _region = _utils.get_region_from_session(boto3_session=boto3_session, default_region="us-east-1")
        jar = f"s3://{_region}.elasticmapreduce/libs/script-runner/script-runner.jar"
    step: dict[str, Any] = {
        "Name": name,
        "ActionOnFailure": action_on_failure,
        "HadoopJarStep": {"Jar": jar, "Args": command.split(" ")},
    }
    return step


def get_step_state(cluster_id: str, step_id: str, boto3_session: boto3.Session | None = None) -> str:
    """Get EMR step state.

    Possible states: 'PENDING', 'CANCEL_PENDING', 'RUNNING',
    'COMPLETED', 'CANCELLED', 'FAILED', 'INTERRUPTED'

    Parameters
    ----------
    cluster_id
        Cluster ID.
    step_id
        Step ID.
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        State.

    Examples
    --------
    >>> import awswrangler as wr
    >>> state = wr.emr.get_step_state("cluster-id", "step-id")

    """
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.describe_step(ClusterId=cluster_id, StepId=step_id)
    _logger.debug("response: \n%s", pprint.pformat(response))
    return response["Step"]["Status"]["State"]


def submit_ecr_credentials_refresh(
    cluster_id: str,
    path: str,
    action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
    boto3_session: boto3.Session | None = None,
) -> str:
    """Update internal ECR credentials.

    Parameters
    ----------
    cluster_id
        Cluster ID.
    path
        Amazon S3 path where awswrangler will stage the script ecr_credentials_refresh.py (e.g. s3://bucket/emr/)
    action_on_failure
        'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        Step ID.

    Examples
    --------
    >>> import awswrangler as wr
    >>> step_id = wr.emr.submit_ecr_credentials_refresh("cluster_id", "s3://bucket/emr/")

    """
    path = path[:-1] if path.endswith("/") else path
    path_script: str = f"{path}/ecr_credentials_refresh.py"
    client_s3 = _utils.client(service_name="s3", session=boto3_session)
    bucket, key = _utils.parse_path(path=path_script)
    region: str = _utils.get_region_from_session(boto3_session=boto3_session)
    client_s3.put_object(
        Body=_get_ecr_credentials_refresh_content(region=region).encode(encoding="utf-8"), Bucket=bucket, Key=key
    )
    command: str = f"spark-submit --deploy-mode cluster {path_script}"
    name: str = "ECR Credentials Refresh"
    step: dict[str, Any] = build_step(
        name=name, command=command, action_on_failure=action_on_failure, script=False, boto3_session=boto3_session
    )
    client_emr = _utils.client(service_name="emr", session=boto3_session)
    response = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])  # type: ignore[list-item]
    _logger.debug("response: \n%s", pprint.pformat(response))
    return response["StepIds"][0]


def build_spark_step(
    path: str,
    args: list[str] | None = None,
    deploy_mode: Literal["cluster", "client"] = "cluster",
    docker_image: str | None = None,
    name: str = "my-step",
    action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
    region: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> dict[str, Any]:
    """Build the Step structure (dictionary).

    Parameters
    ----------
    path
        Script path. (e.g. s3://bucket/app.py)
    args
        CLI args to use with script
    deploy_mode
        "cluster" | "client"
    docker_image
        e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}"
    name
        Step name.
    action_on_failure
        'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
    region
        Region name to not get it from boto3.Session. (e.g. `us-east-1`)
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        Step structure.

    Examples
    --------
    >>> import awswrangler as wr
    >>> step_id = wr.emr.submit_steps(
    >>>     cluster_id="cluster-id",
    >>>     steps=[
    >>>         wr.emr.build_spark_step(path="s3://bucket/app.py")
    >>>     ]
    >>> )

    """
    script_args = " ".join(args) if args else ""
    if docker_image is None:
        cmd: str = f"spark-submit --deploy-mode {deploy_mode} {path} {script_args}"
    else:
        config: str = "hdfs:///user/hadoop/config.json"
        cmd = (
            f"spark-submit --deploy-mode {deploy_mode} "
            f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker "
            f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} "
            f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} "
            f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro "
            f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker "
            f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} "
            f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} "
            f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro "
            f"{path} {script_args}"
        )
    return build_step(
        command=cmd,
        name=name,
        action_on_failure=action_on_failure,
        script=False,
        region=region,
        boto3_session=boto3_session,
    )


def submit_spark_step(
    cluster_id: str,
    path: str,
    args: list[str] | None = None,
    deploy_mode: Literal["cluster", "client"] = "cluster",
    docker_image: str | None = None,
    name: str = "my-step",
    action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
    region: str | None = None,
    boto3_session: boto3.Session | None = None,
) -> str:
    """Submit Spark Step.

    Parameters
    ----------
    cluster_id
        Cluster ID.
    path
        Script path. (e.g. s3://bucket/app.py)
    args
        CLI args to use with script
        eg. args = ["--name", "hello-world"]
    deploy_mode
        "cluster" | "client"
    docker_image
        e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}"
    name
        Step name.
    action_on_failure
        'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
    region
        Region name to not get it from boto3.Session. (e.g. `us-east-1`)
    boto3_session
        The default boto3 session will be used if **boto3_session** is ``None``.

    Returns
    -------
        Step ID.

    Examples
    --------
    >>> import awswrangler as wr
    >>> step_id = wr.emr.submit_spark_step(
    >>>     cluster_id="cluster-id",
    >>>     path="s3://bucket/emr/app.py"
    >>> )

    """
    step = build_spark_step(
        path=path,
        args=args,
        deploy_mode=deploy_mode,
        docker_image=docker_image,
        name=name,
        action_on_failure=action_on_failure,
        region=region,
        boto3_session=boto3_session,
    )
    return submit_steps(cluster_id=cluster_id, steps=[step], boto3_session=boto3_session)[0]
