def demo_long_lived_cluster()

in python/example_code/emr/emr_usage_demo.py [0:0]


def demo_long_lived_cluster():
    """
    Shows how to create a long-lived cluster that waits after all steps are run so
    that more steps can be run. At the end of the demo, the cluster is optionally
    terminated.
    """
    print('-'*88)
    print(f"Welcome to the Amazon EMR long-lived cluster demo.")
    print('-'*88)

    prefix = 'demo-long-emr'

    s3_resource = boto3.resource('s3')
    iam_resource = boto3.resource('iam')
    emr_client = boto3.client('emr')
    ec2_resource = boto3.resource('ec2')

    # Set up resources for the demo.
    bucket_name = f'{prefix}-{time.time_ns()}'
    script_file_name = 'pyspark_top_product_keyword.py'
    script_key = f'scripts/{script_file_name}'
    bucket = setup_bucket(bucket_name, script_file_name, script_key, s3_resource)
    job_flow_role, service_role = \
        create_roles(f'{prefix}-ec2-role', f'{prefix}-service-role', iam_resource)
    security_groups = create_security_groups(prefix, ec2_resource)
    print("Wait for 10 seconds to give roles and profiles time to propagate...")
    time.sleep(10)

    max_tries = 5
    while True:
        try:
            cluster_id = emr_basics.run_job_flow(
                f'{prefix}-cluster', f's3://{bucket_name}/logs',
                True, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
                security_groups, [], emr_client)
            print(f"Running job flow for cluster {cluster_id}...")
            break
        except ClientError as error:
            max_tries -= 1
            if max_tries > 0 and \
                    error.response['Error']['Code'] == 'ValidationException':
                print("Instance profile is not ready, let's give it more time...")
                time.sleep(10)
            else:
                raise
    status_poller(
        "Waiting for cluster, this typically takes several minutes...",
        'WAITING',
        lambda: emr_basics.describe_cluster(cluster_id, emr_client)['Status']['State'],
    )

    add_top_product_step(
        '20', 'Books', 'fire', cluster_id, bucket, script_key, emr_client)

    add_top_product_step(
        '20', 'Grocery', 'cheese', cluster_id, bucket, script_key, emr_client)

    review_bucket_folders = s3_resource.meta.client.list_objects_v2(
        Bucket='demo-reviews-pds', Prefix='parquet/', Delimiter='/', MaxKeys=100)
    categories = [
        cat['Prefix'].split('=')[1][:-1] for cat in
        review_bucket_folders['CommonPrefixes']]
    while True:
        while True:
            input_cat = input(
                f"Your turn! Possible categories are: {categories}. Which category "
                f"would you like to search (enter 'none' when you're done)? ")
            if input_cat.lower() == 'none' or input_cat in categories:
                break
            elif input_cat not in categories:
                print(f"Sorry, {input_cat} is not an allowed category!")
        if input_cat.lower() == 'none':
            break
        else:
            input_keyword = input("What keyword would you like to search for? ")
            input_count = input("How many items would you like to list? ")
            add_top_product_step(
                input_count, input_cat, input_keyword, cluster_id, bucket, script_key,
                emr_client)

    # Clean up demo resources (if you want to).
    remove_everything = input(
            f"Do you want to terminate the cluster and delete the security roles, "
            f"groups, bucket, and all of its contents (y/n)? ")
    if remove_everything.lower() == 'y':
        emr_basics.terminate_cluster(cluster_id, emr_client)
        status_poller(
            "Waiting for cluster to terminate.",
            'TERMINATED',
            lambda: emr_basics.describe_cluster(cluster_id, emr_client)['Status'][
                'State']
        )
        delete_security_groups(security_groups)
        delete_roles([job_flow_role, service_role])
        delete_bucket(bucket)
    else:
        print(
            f"Remember that running Amazon EMR clusters and objects kept in an "
            f"Amazon S3 bucket can incur charges against your account.")
    print("Thanks for watching!")