awswrangler/emr.py (533 lines of code) (raw):
"""EMR (Elastic Map Reduce) module."""
from __future__ import annotations
import logging
import pprint
import re
from typing import Any, Literal, cast
import boto3
from awswrangler import _utils, exceptions, sts
_logger: logging.Logger = logging.getLogger(__name__)
_ActionOnFailureLiteral = Literal["TERMINATE_JOB_FLOW", "TERMINATE_CLUSTER", "CANCEL_AND_WAIT", "CONTINUE"]
def _get_ecr_credentials_refresh_content(region: str) -> str:
return f"""
import subprocess
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ECR Setup Job").getOrCreate()
COMMANDS = [
"sudo -s eval $(aws ecr get-login --region {region} --no-include-email)",
"sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/"
]
for command in COMMANDS:
subprocess.run(command.split(" "), timeout=6.0, check=True)
print("done!")
"""
def _get_default_logging_path(
subnet_id: str | None = None,
account_id: str | None = None,
region: str | None = None,
boto3_session: boto3.Session | None = None,
) -> str:
"""Get EMR default logging path.
E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/"
Parameters
----------
subnet_id : str, optional
Subnet ID. If not provided, you must pass `account_id` and `region` explicit.
account_id: str, optional
Account ID.
region: str, optional
Region e.g. 'us-east-1'
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
Returns
-------
str
Default logging path.
E.g. "s3://aws-logs-{account_id}-{region}/elasticmapreduce/"
Examples
--------
>>> import awswrangler as wr
>>> state = wr.emr._get_default_logging_path("subnet-id")
's3://aws-logs-{account_id}-{region}/elasticmapreduce/'
"""
if account_id is None:
_account_id: str = sts.get_account_id(boto3_session=boto3_session)
else:
_account_id = account_id
if (region is None) and (subnet_id is not None):
_region: str = _utils.get_region_from_session(boto3_session=boto3_session)
elif (region is None) and (subnet_id is None):
raise exceptions.InvalidArgumentCombination("You must pass region or subnet_id or both.")
else:
_region = cast(str, region)
return f"s3://aws-logs-{_account_id}-{_region}/elasticmapreduce/"
def _get_emr_classification_lib(emr_version: str) -> str:
"""Parse emr release string.
Parse emr release string and return its corresponding Classification
configuration string. i.e. log4j or log4j2.
Parameters
----------
emr_version: emr release string
Returns
-------
A string mentioning the appropriate classification lib based on the emr release.
"""
matches = re.findall(r"(\d.\d.\d)", emr_version)
number = 670
if matches:
number = int(matches[0].replace(".", ""))
return "spark-log4j2" if number > 670 else "spark-log4j"
def _build_cluster_args(**pars: Any) -> dict[str, Any]: # noqa: PLR0912,PLR0915
account_id: str = sts.get_account_id(boto3_session=pars["boto3_session"])
region: str = _utils.get_region_from_session(boto3_session=pars["boto3_session"])
# S3 Logging path
if pars.get("logging_s3_path") is None:
pars["logging_s3_path"] = _get_default_logging_path(
subnet_id=None, account_id=account_id, region=region, boto3_session=pars["boto3_session"]
)
spark_env: dict[str, str] | None = None
yarn_env: dict[str, str] | None = None
livy_env: dict[str, str] | None = None
if pars["spark_pyarrow"] is True:
if pars["spark_defaults"] is None:
pars["spark_defaults"] = {"spark.sql.execution.arrow.enabled": "true"}
else:
pars["spark_defaults"]["spark.sql.execution.arrow.enabled"] = "true"
spark_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
yarn_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
livy_env = {"ARROW_PRE_0_15_IPC_FORMAT": "1"}
if pars["python3"] is True:
if spark_env is None:
spark_env = {"PYSPARK_PYTHON": "/usr/bin/python3"}
else:
spark_env["PYSPARK_PYTHON"] = "/usr/bin/python3"
if pars["spark_jars_path"] is not None:
paths: str = ",".join(pars["spark_jars_path"])
if pars["spark_defaults"] is None:
pars["spark_defaults"] = {"spark.jars": paths}
else:
pars["spark_defaults"]["spark.jars"] = paths
args: dict[str, Any] = {
"Name": pars["cluster_name"],
"LogUri": pars["logging_s3_path"],
"ReleaseLabel": pars["emr_release"],
"VisibleToAllUsers": pars["visible_to_all_users"],
"JobFlowRole": pars["emr_ec2_role"],
"ServiceRole": pars["emr_role"],
"Instances": {
"KeepJobFlowAliveWhenNoSteps": pars["keep_cluster_alive_when_no_steps"],
"TerminationProtected": pars["termination_protected"],
"Ec2SubnetId": pars["subnet_id"],
"InstanceFleets": [],
},
"StepConcurrencyLevel": pars["step_concurrency_level"],
}
# Auto Termination Policy
if pars["auto_termination_policy"] is not None:
args["AutoTerminationPolicy"] = pars["auto_termination_policy"]
# Custom AMI
if pars["custom_ami_id"] is not None:
args["CustomAmiId"] = pars["custom_ami_id"]
# EC2 Key Pair
if pars["key_pair_name"] is not None:
args["Instances"]["Ec2KeyName"] = pars["key_pair_name"]
# Security groups
if pars["security_group_master"] is not None:
args["Instances"]["EmrManagedMasterSecurityGroup"] = pars["security_group_master"]
if pars["security_groups_master_additional"] is not None:
args["Instances"]["AdditionalMasterSecurityGroups"] = pars["security_groups_master_additional"]
if pars["security_group_slave"] is not None:
args["Instances"]["EmrManagedSlaveSecurityGroup"] = pars["security_group_slave"]
if pars["security_groups_slave_additional"] is not None:
args["Instances"]["AdditionalSlaveSecurityGroups"] = pars["security_groups_slave_additional"]
if pars["security_group_service_access"] is not None:
args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"]
# Configurations
args["Configurations"] = (
[
{
"Classification": _get_emr_classification_lib(pars["emr_release"]),
"Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"},
}
]
if not pars["configurations"]
else pars["configurations"]
)
if pars["docker"] is True:
if pars.get("extra_public_registries") is None:
extra_public_registries: list[str] = []
else:
extra_public_registries = pars["extra_public_registries"]
registries: str = (
f"local,centos,{account_id}.dkr.ecr.{region}.amazonaws.com,{','.join(extra_public_registries)}"
)
registries = registries[:-1] if registries.endswith(",") else registries
args["Configurations"].append(
{
"Classification": "container-executor",
"Properties": {},
"Configurations": [
{
"Classification": "docker",
"Properties": {
"docker.privileged-containers.registries": registries,
"docker.trusted.registries": registries,
},
"Configurations": [],
}
],
}
)
if spark_env is not None:
args["Configurations"].append(
{
"Classification": "spark-env",
"Properties": {},
"Configurations": [{"Classification": "export", "Properties": spark_env, "Configurations": []}],
}
)
if yarn_env is not None:
args["Configurations"].append(
{
"Classification": "yarn-env",
"Properties": {},
"Configurations": [{"Classification": "export", "Properties": yarn_env, "Configurations": []}],
}
)
if livy_env is not None:
args["Configurations"].append(
{
"Classification": "livy-env",
"Properties": {},
"Configurations": [{"Classification": "export", "Properties": livy_env, "Configurations": []}],
}
)
if pars["spark_glue_catalog"] is True:
args["Configurations"].append(
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
},
"Configurations": [],
}
)
if pars["hive_glue_catalog"] is True:
hive_conf: dict[str, Any] = {"Classification": "hive-site", "Properties": {}, "Configurations": []}
hive_conf["Properties"]["hive.metastore.client.factory.class"] = (
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
)
args["Configurations"].append(hive_conf)
if pars["presto_glue_catalog"] is True:
args["Configurations"].append(
{
"Classification": "presto-connector-hive",
"Properties": {"hive.metastore.glue.datacatalog.enabled": "true"},
"Configurations": [],
}
)
if pars["consistent_view"] is True:
args["Configurations"].append(
{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.consistent.retryPeriodSeconds": str(pars.get("consistent_view_retry_seconds", "10")),
"fs.s3.consistent": "true",
"fs.s3.consistent.retryCount": str(pars.get("consistent_view_retry_count", "5")),
"fs.s3.consistent.metadata.tableName": pars.get("consistent_view_table_name", "EmrFSMetadata"),
},
}
)
if pars["maximize_resource_allocation"] is True:
args["Configurations"].append({"Classification": "spark", "Properties": {"maximizeResourceAllocation": "true"}})
if pars["spark_defaults"] is not None:
spark_defaults: dict[str, str | dict[str, str]] = {
"Classification": "spark-defaults",
"Properties": pars["spark_defaults"],
}
args["Configurations"].append(spark_defaults)
if pars.get("custom_classifications") is not None:
for c in pars["custom_classifications"]:
args["Configurations"].append(c)
# Applications
if pars["applications"]:
args["Applications"] = [{"Name": x} for x in pars["applications"]]
# Bootstraps
if pars["bootstraps_paths"]:
args["BootstrapActions"] = [{"Name": x, "ScriptBootstrapAction": {"Path": x}} for x in pars["bootstraps_paths"]]
# Debugging and Steps
if (pars["debugging"] is True) or (pars["steps"] is not None):
args["Steps"] = []
if pars["debugging"] is True:
args["Steps"].append(
{
"Name": "Setup Hadoop Debugging",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {"Jar": "command-runner.jar", "Args": ["state-pusher-script"]},
}
)
if pars["steps"] is not None:
args["Steps"] += pars["steps"]
# Master Instance Fleet
timeout_action_master: str = (
"SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_master"] else "TERMINATE_CLUSTER"
)
fleet_master: dict[str, Any] = {
"Name": "MASTER",
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": pars["instance_num_on_demand_master"],
"TargetSpotCapacity": pars["instance_num_spot_master"],
"InstanceTypeConfigs": [
{
"InstanceType": pars["instance_type_master"],
"WeightedCapacity": 1,
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_master"],
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {"SizeInGB": pars["instance_ebs_size_master"], "VolumeType": "gp2"},
"VolumesPerInstance": 1,
}
],
"EbsOptimized": True,
},
}
],
}
if pars["instance_num_spot_master"] > 0:
fleet_master["LaunchSpecifications"] = {
"SpotSpecification": {
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_master"],
"TimeoutAction": timeout_action_master,
}
}
args["Instances"]["InstanceFleets"].append(fleet_master)
# Core Instance Fleet
if (pars["instance_num_spot_core"] > 0) or pars["instance_num_on_demand_core"] > 0:
timeout_action_core = "SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_core"] else "TERMINATE_CLUSTER"
fleet_core: dict[str, Any] = {
"Name": "CORE",
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": pars["instance_num_on_demand_core"],
"TargetSpotCapacity": pars["instance_num_spot_core"],
"InstanceTypeConfigs": [
{
"InstanceType": pars["instance_type_core"],
"WeightedCapacity": 1,
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_core"],
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": pars["instance_ebs_size_core"],
"VolumeType": "gp2",
},
"VolumesPerInstance": 1,
}
],
"EbsOptimized": True,
},
}
],
}
if pars["instance_num_spot_core"] > 0:
fleet_core["LaunchSpecifications"] = {
"SpotSpecification": {
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_core"],
"TimeoutAction": timeout_action_core,
}
}
args["Instances"]["InstanceFleets"].append(fleet_core)
# Task Instance Fleet
if (pars["instance_num_spot_task"] > 0) or pars["instance_num_on_demand_task"] > 0:
timeout_action_task: str = (
"SWITCH_TO_ON_DEMAND" if pars["spot_timeout_to_on_demand_task"] else "TERMINATE_CLUSTER"
)
fleet_task: dict[str, Any] = {
"Name": "TASK",
"InstanceFleetType": "TASK",
"TargetOnDemandCapacity": pars["instance_num_on_demand_task"],
"TargetSpotCapacity": pars["instance_num_spot_task"],
"InstanceTypeConfigs": [
{
"InstanceType": pars["instance_type_task"],
"WeightedCapacity": 1,
"BidPriceAsPercentageOfOnDemandPrice": pars["spot_bid_percentage_of_on_demand_task"],
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": pars["instance_ebs_size_task"],
"VolumeType": "gp2",
},
"VolumesPerInstance": 1,
}
],
"EbsOptimized": True,
},
}
],
}
if pars["instance_num_spot_task"] > 0:
fleet_task["LaunchSpecifications"] = {
"SpotSpecification": {
"TimeoutDurationMinutes": pars["spot_provisioning_timeout_task"],
"TimeoutAction": timeout_action_task,
}
}
args["Instances"]["InstanceFleets"].append(fleet_task)
if pars["security_configuration"]:
args["SecurityConfiguration"] = pars["security_configuration"]
# Tags
if pars["tags"] is not None:
args["Tags"] = [{"Key": k, "Value": v} for k, v in pars["tags"].items()]
_logger.debug("args: \n%s", pprint.pformat(args))
return args
def create_cluster( # noqa: PLR0913
subnet_id: str,
cluster_name: str = "my-emr-cluster",
logging_s3_path: str | None = None,
emr_release: str = "emr-6.7.0",
emr_ec2_role: str = "EMR_EC2_DefaultRole",
emr_role: str = "EMR_DefaultRole",
instance_type_master: str = "r5.xlarge",
instance_type_core: str = "r5.xlarge",
instance_type_task: str = "r5.xlarge",
instance_ebs_size_master: int = 64,
instance_ebs_size_core: int = 64,
instance_ebs_size_task: int = 64,
instance_num_on_demand_master: int = 1,
instance_num_on_demand_core: int = 0,
instance_num_on_demand_task: int = 0,
instance_num_spot_master: int = 0,
instance_num_spot_core: int = 0,
instance_num_spot_task: int = 0,
spot_bid_percentage_of_on_demand_master: int = 100,
spot_bid_percentage_of_on_demand_core: int = 100,
spot_bid_percentage_of_on_demand_task: int = 100,
spot_provisioning_timeout_master: int = 5,
spot_provisioning_timeout_core: int = 5,
spot_provisioning_timeout_task: int = 5,
spot_timeout_to_on_demand_master: bool = True,
spot_timeout_to_on_demand_core: bool = True,
spot_timeout_to_on_demand_task: bool = True,
python3: bool = True,
spark_glue_catalog: bool = True,
hive_glue_catalog: bool = True,
presto_glue_catalog: bool = True,
consistent_view: bool = False,
consistent_view_retry_seconds: int = 10,
consistent_view_retry_count: int = 5,
consistent_view_table_name: str = "EmrFSMetadata",
bootstraps_paths: list[str] | None = None,
debugging: bool = True,
applications: list[str] | None = None,
visible_to_all_users: bool = True,
key_pair_name: str | None = None,
security_group_master: str | None = None,
security_groups_master_additional: list[str] | None = None,
security_group_slave: str | None = None,
security_groups_slave_additional: list[str] | None = None,
security_group_service_access: str | None = None,
security_configuration: str | None = None,
docker: bool = False,
extra_public_registries: list[str] | None = None,
spark_log_level: str = "WARN",
spark_jars_path: list[str] | None = None,
spark_defaults: dict[str, str] | None = None,
spark_pyarrow: bool = False,
custom_classifications: list[dict[str, Any]] | None = None,
maximize_resource_allocation: bool = False,
steps: list[dict[str, Any]] | None = None,
custom_ami_id: str | None = None,
step_concurrency_level: int = 1,
keep_cluster_alive_when_no_steps: bool = True,
termination_protected: bool = False,
auto_termination_policy: dict[str, int] | None = None,
tags: dict[str, str] | None = None,
boto3_session: boto3.Session | None = None,
configurations: list[dict[str, Any]] | None = None,
) -> str:
"""Create a EMR cluster with instance fleets configuration.
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html
Parameters
----------
subnet_id
VPC subnet ID.
cluster_name
Cluster name.
logging_s3_path
Logging s3 path (e.g. s3://BUCKET_NAME/DIRECTORY_NAME/).
If None, the default is `s3://aws-logs-{AccountId}-{RegionId}/elasticmapreduce/`
emr_release
EMR release (e.g. emr-5.28.0).
emr_ec2_role
IAM role name.
emr_role
IAM role name.
instance_type_master
EC2 instance type.
instance_type_core
EC2 instance type.
instance_type_task
EC2 instance type.
instance_ebs_size_master
Size of EBS in GB.
instance_ebs_size_core
Size of EBS in GB.
instance_ebs_size_task
Size of EBS in GB.
instance_num_on_demand_master
Number of on demand instances.
instance_num_on_demand_core
Number of on demand instances.
instance_num_on_demand_task
Number of on demand instances.
instance_num_spot_master
Number of spot instances.
instance_num_spot_core
Number of spot instances.
instance_num_spot_task
Number of spot instances.
spot_bid_percentage_of_on_demand_master
The bid price, as a percentage of On-Demand price.
spot_bid_percentage_of_on_demand_core
The bid price, as a percentage of On-Demand price.
spot_bid_percentage_of_on_demand_task
The bid price, as a percentage of On-Demand price.
spot_provisioning_timeout_master
The spot provisioning timeout period in minutes.
If Spot instances are not provisioned within this time period,
the TimeOutAction is taken. Minimum value is 5 and maximum value is 1440.
The timeout applies only during initial provisioning,
when the cluster is first created.
spot_provisioning_timeout_core
The spot provisioning timeout period in minutes.
If Spot instances are not provisioned within this time period,
the TimeOutAction is taken. Minimum value is 5 and maximum value is 1440.
The timeout applies only during initial provisioning,
when the cluster is first created.
spot_provisioning_timeout_task
The spot provisioning timeout period in minutes.
If Spot instances are not provisioned within this time period,
the TimeOutAction is taken. Minimum value is 5 and maximum value is 1440.
The timeout applies only during initial provisioning,
when the cluster is first created.
spot_timeout_to_on_demand_master
After a provisioning timeout should the cluster switch to
on demand or shutdown?
spot_timeout_to_on_demand_core
After a provisioning timeout should the cluster switch to
on demand or shutdown?
spot_timeout_to_on_demand_task
After a provisioning timeout should the cluster switch to
on demand or shutdown?
python3
Python 3 Enabled?
spark_glue_catalog
Spark integration with Glue Catalog?
hive_glue_catalog
Hive integration with Glue Catalog?
presto_glue_catalog
Presto integration with Glue Catalog?
consistent_view
Consistent view allows EMR clusters to check for
list and read-after-write consistency for
Amazon S3 objects written by or synced with EMRFS.
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html
consistent_view_retry_seconds
Delay between the tries (seconds).
consistent_view_retry_count
Number of tries.
consistent_view_table_name
Name of the DynamoDB table to store the consistent view data.
bootstraps_paths
Bootstraps paths (e.g ["s3://BUCKET_NAME/script.sh"]).
debugging
Debugging enabled?
applications
List of applications (e.g ["Hadoop", "Spark", "Ganglia", "Hive"]).
If None, ["Spark"] will be considered.
visible_to_all_users
True or False.
key_pair_name
Key pair name.
security_group_master
The identifier of the Amazon EC2 security group for the master node.
security_groups_master_additional
A list of additional Amazon EC2 security group IDs for the master node.
security_group_slave
The identifier of the Amazon EC2 security group for
the core and task nodes.
security_groups_slave_additional
A list of additional Amazon EC2 security group IDs for
the core and task nodes.
security_group_service_access
The identifier of the Amazon EC2 security group for the Amazon EMR
service to access clusters in VPC private subnets.
security_configuration:str, optional
The name of a security configuration to apply to the cluster.
docker
Enable Docker Hub and ECR registries access.
extra_public_registries
Additional docker registries.
spark_log_level
log4j.rootCategory log level (ALL, DEBUG, INFO, WARN, ERROR, FATAL, OFF, TRACE).
spark_jars_path
spark.jars e.g. [s3://.../foo.jar, s3://.../boo.jar]
https://spark.apache.org/docs/latest/configuration.html
spark_defaults
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#spark-defaults
spark_pyarrow
Enable PySpark to use PyArrow behind the scenes.
P.S. You must install pyarrow by your self via bootstrap
custom_classifications
Extra classifications.
maximize_resource_allocation
Configure your executors to utilize the maximum resources possible
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
custom_ami_id
The custom AMI ID to use for the provisioned instance group
steps
Steps definitions (Obs : str Use EMR.build_step() to build it)
keep_cluster_alive_when_no_steps
Specifies whether the cluster should
remain available after completing all steps
termination_protected
Specifies whether the Amazon EC2 instances in the cluster are
protected from termination by API calls, user intervention,
or in the event of a job-flow error.
auto_termination_policy
Specifies the auto-termination policy that is attached to an Amazon EMR cluster
eg. auto_termination_policy = {'IdleTimeout': 123}
IdleTimeout specifies the amount of idle time in seconds after which the cluster automatically terminates.
You can specify a minimum of 60 seconds and a maximum of 604800 seconds (seven days).
tags
Key/Value collection to put on the Cluster.
e.g. {"foo": "boo", "bar": "xoo"})
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
configurations
The list of configurations supplied for an EMR cluster instance group.
By default, adds log4j config as follows:
`{"Classification": "spark-log4j", "Properties": {"log4j.rootCategory": f"{pars['spark_log_level']}, console"}}`
Returns
-------
Cluster ID.
Examples
--------
Minimal Example
>>> import awswrangler as wr
>>> cluster_id = wr.emr.create_cluster("SUBNET_ID")
Minimal Example With Custom Classification
>>> import awswrangler as wr
>>> cluster_id = wr.emr.create_cluster(
>>> subnet_id="SUBNET_ID",
>>> custom_classifications=[
>>> {
>>> "Classification": "livy-conf",
>>> "Properties": {
>>> "livy.spark.master": "yarn",
>>> "livy.spark.deploy-mode": "cluster",
>>> "livy.server.session.timeout": "16h",
>>> },
>>> }
>>> ],
>>> )
Full Example
>>> import awswrangler as wr
>>> cluster_id = wr.emr.create_cluster(
... cluster_name="wrangler_cluster",
... logging_s3_path=f"s3://BUCKET_NAME/emr-logs/",
... emr_release="emr-5.28.0",
... subnet_id="SUBNET_ID",
... emr_ec2_role="EMR_EC2_DefaultRole",
... emr_role="EMR_DefaultRole",
... instance_type_master="m5.xlarge",
... instance_type_core="m5.xlarge",
... instance_type_task="m5.xlarge",
... instance_ebs_size_master=50,
... instance_ebs_size_core=50,
... instance_ebs_size_task=50,
... instance_num_on_demand_master=1,
... instance_num_on_demand_core=1,
... instance_num_on_demand_task=1,
... instance_num_spot_master=0,
... instance_num_spot_core=1,
... instance_num_spot_task=1,
... spot_bid_percentage_of_on_demand_master=100,
... spot_bid_percentage_of_on_demand_core=100,
... spot_bid_percentage_of_on_demand_task=100,
... spot_provisioning_timeout_master=5,
... spot_provisioning_timeout_core=5,
... spot_provisioning_timeout_task=5,
... spot_timeout_to_on_demand_master=True,
... spot_timeout_to_on_demand_core=True,
... spot_timeout_to_on_demand_task=True,
... python3=True,
... spark_glue_catalog=True,
... hive_glue_catalog=True,
... presto_glue_catalog=True,
... bootstraps_paths=None,
... debugging=True,
... applications=["Hadoop", "Spark", "Ganglia", "Hive"],
... visible_to_all_users=True,
... key_pair_name=None,
... spark_jars_path=[f"s3://...jar"],
... maximize_resource_allocation=True,
... keep_cluster_alive_when_no_steps=True,
... termination_protected=False,
... spark_pyarrow=True,
... tags={
... "foo": "boo"
... })
"""
applications = ["Spark"] if applications is None else applications
args: dict[str, Any] = _build_cluster_args(**locals())
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.run_job_flow(**args)
_logger.debug("response: \n%s", pprint.pformat(response))
return response["JobFlowId"]
def get_cluster_state(cluster_id: str, boto3_session: boto3.Session | None = None) -> str:
"""Get the EMR cluster state.
Possible states: 'STARTING', 'BOOTSTRAPPING', 'RUNNING',
'WAITING', 'TERMINATING',
'TERMINATED', 'TERMINATED_WITH_ERRORS'
Parameters
----------
cluster_id
Cluster ID.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
State.
Examples
--------
>>> import awswrangler as wr
>>> state = wr.emr.get_cluster_state("cluster-id")
"""
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.describe_cluster(ClusterId=cluster_id)
_logger.debug("response: \n%s", pprint.pformat(response))
return response["Cluster"]["Status"]["State"]
def terminate_cluster(cluster_id: str, boto3_session: boto3.Session | None = None) -> None:
"""Terminate EMR cluster.
Parameters
----------
cluster_id
Cluster ID.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Examples
--------
>>> import awswrangler as wr
>>> wr.emr.terminate_cluster("cluster-id")
"""
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.terminate_job_flows(JobFlowIds=[cluster_id])
_logger.debug("response: \n%s", pprint.pformat(response))
def submit_steps(cluster_id: str, steps: list[dict[str, Any]], boto3_session: boto3.Session | None = None) -> list[str]:
"""Submit a list of steps.
Parameters
----------
cluster_id
Cluster ID.
steps
Steps definitions (Obs: Use EMR.build_step() to build it).
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
List of step IDs.
Examples
--------
>>> import awswrangler as wr
>>> for cmd in ['echo "Hello"', "ls -la"]:
... steps.append(wr.emr.build_step(name=cmd, command=cmd))
>>> wr.emr.submit_steps(cluster_id="cluster-id", steps=steps)
"""
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps) # type: ignore[arg-type]
_logger.debug("response: \n%s", pprint.pformat(response))
return response["StepIds"]
def submit_step(
cluster_id: str,
command: str,
name: str = "my-step",
action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
script: bool = False,
boto3_session: boto3.Session | None = None,
) -> str:
"""Submit new job in the EMR Cluster.
Parameters
----------
cluster_id
Cluster ID.
command
e.g. 'echo "Hello!"'
e.g. for script 's3://.../script.sh arg1 arg2'
name
Step name.
action_on_failure
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
script
True for raw command or False for script runner.
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Step ID.
Examples
--------
>>> import awswrangler as wr
>>> step_id = wr.emr.submit_step(
... cluster_id=cluster_id,
... name="step_test",
... command="s3://...script.sh arg1 arg2",
... script=True,
... )
"""
step: dict[str, Any] = build_step(
name=name, command=command, action_on_failure=action_on_failure, script=script, boto3_session=boto3_session
)
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) # type: ignore[list-item]
_logger.debug("response: \n%s", pprint.pformat(response))
return response["StepIds"][0]
def build_step(
command: str,
name: str = "my-step",
action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
script: bool = False,
region: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, Any]:
"""Build the Step structure (dictionary).
Parameters
----------
command
e.g. 'echo "Hello!"'
e.g. for script 's3://.../script.sh arg1 arg2'
name
Step name.
action_on_failure
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
script
False for raw command or True for script runner.
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html
region
Region name to not get it from boto3.Session. (e.g. `us-east-1`)
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Step structure.
Examples
--------
>>> import awswrangler as wr
>>> steps = []
>>> for cmd in ['echo "Hello"', "ls -la"]:
... steps.append(wr.emr.build_step(name=cmd, command=cmd))
>>> wr.emr.submit_steps(cluster_id="cluster-id", steps=steps)
"""
jar: str = "command-runner.jar"
if script is True:
if region is not None:
_region: str = region
else:
_region = _utils.get_region_from_session(boto3_session=boto3_session, default_region="us-east-1")
jar = f"s3://{_region}.elasticmapreduce/libs/script-runner/script-runner.jar"
step: dict[str, Any] = {
"Name": name,
"ActionOnFailure": action_on_failure,
"HadoopJarStep": {"Jar": jar, "Args": command.split(" ")},
}
return step
def get_step_state(cluster_id: str, step_id: str, boto3_session: boto3.Session | None = None) -> str:
"""Get EMR step state.
Possible states: 'PENDING', 'CANCEL_PENDING', 'RUNNING',
'COMPLETED', 'CANCELLED', 'FAILED', 'INTERRUPTED'
Parameters
----------
cluster_id
Cluster ID.
step_id
Step ID.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
State.
Examples
--------
>>> import awswrangler as wr
>>> state = wr.emr.get_step_state("cluster-id", "step-id")
"""
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.describe_step(ClusterId=cluster_id, StepId=step_id)
_logger.debug("response: \n%s", pprint.pformat(response))
return response["Step"]["Status"]["State"]
def submit_ecr_credentials_refresh(
cluster_id: str,
path: str,
action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
boto3_session: boto3.Session | None = None,
) -> str:
"""Update internal ECR credentials.
Parameters
----------
cluster_id
Cluster ID.
path
Amazon S3 path where awswrangler will stage the script ecr_credentials_refresh.py (e.g. s3://bucket/emr/)
action_on_failure
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Step ID.
Examples
--------
>>> import awswrangler as wr
>>> step_id = wr.emr.submit_ecr_credentials_refresh("cluster_id", "s3://bucket/emr/")
"""
path = path[:-1] if path.endswith("/") else path
path_script: str = f"{path}/ecr_credentials_refresh.py"
client_s3 = _utils.client(service_name="s3", session=boto3_session)
bucket, key = _utils.parse_path(path=path_script)
region: str = _utils.get_region_from_session(boto3_session=boto3_session)
client_s3.put_object(
Body=_get_ecr_credentials_refresh_content(region=region).encode(encoding="utf-8"), Bucket=bucket, Key=key
)
command: str = f"spark-submit --deploy-mode cluster {path_script}"
name: str = "ECR Credentials Refresh"
step: dict[str, Any] = build_step(
name=name, command=command, action_on_failure=action_on_failure, script=False, boto3_session=boto3_session
)
client_emr = _utils.client(service_name="emr", session=boto3_session)
response = client_emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step]) # type: ignore[list-item]
_logger.debug("response: \n%s", pprint.pformat(response))
return response["StepIds"][0]
def build_spark_step(
path: str,
args: list[str] | None = None,
deploy_mode: Literal["cluster", "client"] = "cluster",
docker_image: str | None = None,
name: str = "my-step",
action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
region: str | None = None,
boto3_session: boto3.Session | None = None,
) -> dict[str, Any]:
"""Build the Step structure (dictionary).
Parameters
----------
path
Script path. (e.g. s3://bucket/app.py)
args
CLI args to use with script
deploy_mode
"cluster" | "client"
docker_image
e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}"
name
Step name.
action_on_failure
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
region
Region name to not get it from boto3.Session. (e.g. `us-east-1`)
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Step structure.
Examples
--------
>>> import awswrangler as wr
>>> step_id = wr.emr.submit_steps(
>>> cluster_id="cluster-id",
>>> steps=[
>>> wr.emr.build_spark_step(path="s3://bucket/app.py")
>>> ]
>>> )
"""
script_args = " ".join(args) if args else ""
if docker_image is None:
cmd: str = f"spark-submit --deploy-mode {deploy_mode} {path} {script_args}"
else:
config: str = "hdfs:///user/hadoop/config.json"
cmd = (
f"spark-submit --deploy-mode {deploy_mode} "
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker "
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} "
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} "
f"--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro "
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker "
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={docker_image} "
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={config} "
f"--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro "
f"{path} {script_args}"
)
return build_step(
command=cmd,
name=name,
action_on_failure=action_on_failure,
script=False,
region=region,
boto3_session=boto3_session,
)
def submit_spark_step(
cluster_id: str,
path: str,
args: list[str] | None = None,
deploy_mode: Literal["cluster", "client"] = "cluster",
docker_image: str | None = None,
name: str = "my-step",
action_on_failure: _ActionOnFailureLiteral = "CONTINUE",
region: str | None = None,
boto3_session: boto3.Session | None = None,
) -> str:
"""Submit Spark Step.
Parameters
----------
cluster_id
Cluster ID.
path
Script path. (e.g. s3://bucket/app.py)
args
CLI args to use with script
eg. args = ["--name", "hello-world"]
deploy_mode
"cluster" | "client"
docker_image
e.g. "{ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com/{IMAGE_NAME}:{TAG}"
name
Step name.
action_on_failure
'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
region
Region name to not get it from boto3.Session. (e.g. `us-east-1`)
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Step ID.
Examples
--------
>>> import awswrangler as wr
>>> step_id = wr.emr.submit_spark_step(
>>> cluster_id="cluster-id",
>>> path="s3://bucket/emr/app.py"
>>> )
"""
step = build_spark_step(
path=path,
args=args,
deploy_mode=deploy_mode,
docker_image=docker_image,
name=name,
action_on_failure=action_on_failure,
region=region,
boto3_session=boto3_session,
)
return submit_steps(cluster_id=cluster_id, steps=[step], boto3_session=boto3_session)[0]