in python/example_code/emr/emr_usage_demo.py [0:0]
def demo_long_lived_cluster():
"""
Shows how to create a long-lived cluster that waits after all steps are run so
that more steps can be run. At the end of the demo, the cluster is optionally
terminated.
"""
print('-'*88)
print(f"Welcome to the Amazon EMR long-lived cluster demo.")
print('-'*88)
prefix = 'demo-long-emr'
s3_resource = boto3.resource('s3')
iam_resource = boto3.resource('iam')
emr_client = boto3.client('emr')
ec2_resource = boto3.resource('ec2')
# Set up resources for the demo.
bucket_name = f'{prefix}-{time.time_ns()}'
script_file_name = 'pyspark_top_product_keyword.py'
script_key = f'scripts/{script_file_name}'
bucket = setup_bucket(bucket_name, script_file_name, script_key, s3_resource)
job_flow_role, service_role = \
create_roles(f'{prefix}-ec2-role', f'{prefix}-service-role', iam_resource)
security_groups = create_security_groups(prefix, ec2_resource)
print("Wait for 10 seconds to give roles and profiles time to propagate...")
time.sleep(10)
max_tries = 5
while True:
try:
cluster_id = emr_basics.run_job_flow(
f'{prefix}-cluster', f's3://{bucket_name}/logs',
True, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
security_groups, [], emr_client)
print(f"Running job flow for cluster {cluster_id}...")
break
except ClientError as error:
max_tries -= 1
if max_tries > 0 and \
error.response['Error']['Code'] == 'ValidationException':
print("Instance profile is not ready, let's give it more time...")
time.sleep(10)
else:
raise
status_poller(
"Waiting for cluster, this typically takes several minutes...",
'WAITING',
lambda: emr_basics.describe_cluster(cluster_id, emr_client)['Status']['State'],
)
add_top_product_step(
'20', 'Books', 'fire', cluster_id, bucket, script_key, emr_client)
add_top_product_step(
'20', 'Grocery', 'cheese', cluster_id, bucket, script_key, emr_client)
review_bucket_folders = s3_resource.meta.client.list_objects_v2(
Bucket='demo-reviews-pds', Prefix='parquet/', Delimiter='/', MaxKeys=100)
categories = [
cat['Prefix'].split('=')[1][:-1] for cat in
review_bucket_folders['CommonPrefixes']]
while True:
while True:
input_cat = input(
f"Your turn! Possible categories are: {categories}. Which category "
f"would you like to search (enter 'none' when you're done)? ")
if input_cat.lower() == 'none' or input_cat in categories:
break
elif input_cat not in categories:
print(f"Sorry, {input_cat} is not an allowed category!")
if input_cat.lower() == 'none':
break
else:
input_keyword = input("What keyword would you like to search for? ")
input_count = input("How many items would you like to list? ")
add_top_product_step(
input_count, input_cat, input_keyword, cluster_id, bucket, script_key,
emr_client)
# Clean up demo resources (if you want to).
remove_everything = input(
f"Do you want to terminate the cluster and delete the security roles, "
f"groups, bucket, and all of its contents (y/n)? ")
if remove_everything.lower() == 'y':
emr_basics.terminate_cluster(cluster_id, emr_client)
status_poller(
"Waiting for cluster to terminate.",
'TERMINATED',
lambda: emr_basics.describe_cluster(cluster_id, emr_client)['Status'][
'State']
)
delete_security_groups(security_groups)
delete_roles([job_flow_role, service_role])
delete_bucket(bucket)
else:
print(
f"Remember that running Amazon EMR clusters and objects kept in an "
f"Amazon S3 bucket can incur charges against your account.")
print("Thanks for watching!")