def provison_neptune_streams_handler()

in neptune-streams-utils/provisioning/provision_neptune_streams_handler.py [0:0]


def provison_neptune_streams_handler(
    cluster_id, 
    handler_s3_bucket,
    handler_s3_key,
    handler_name=None,
    additional_params={},
    query_engine='Gremlin', 
    region='us-east-1', 
    lambda_memory_size_mb=512,
    lambda_runtime='python3.6',
    lambda_logging_level='INFO',
    managed_policy_arns=[],
    batch_size=100,
    max_polling_wait_time_seconds=60,
    max_polling_interval_seconds=600,
    step_function_fallback_period=1,
    step_function_fallback_period_unit='minute',
    notification_email='',
    create_cloudwatch_alarm=False,
    application_name=None, 
    dry_run=False):
    
    if lambda_runtime not in ['python3.6', 'java8', 'python', 'java']:
        raise Exception('lambda_runtime must be python, java, python3.6 or java8: ' + lambda_runtime)
        
    if lambda_logging_level not in ['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']:
        raise Exception('lambda_logging_level must be DEBUG, INFO, WARN, ERROR or FATAL: ' + lambda_logging_level)
        
    if step_function_fallback_period_unit not in ['minutes', 'minute', 'hours', 'hour', 'days', 'day']:
        raise Exception('step_function_fallback_period_unit must be minutes, minute, hours, hour, days or day: ' + step_function_fallback_period_unit)
        
    if step_function_fallback_period == 1 and step_function_fallback_period_unit not in ['minute', 'hour', 'day']:
        raise Exception('step_function_fallback_period_unit must be singular if step_function_fallback_period = 1: ' + step_function_fallback_period_unit)
        
    if create_cloudwatch_alarm and not notification_email:
        raise('You must supply a notification_email if creating a CloudWatch alarm')
    
    neptune_query_engine = query_engine.lower().capitalize()
    
    if neptune_query_engine not in ['Sparql', 'Gremlin']:
        raise Exception('query_engine must be Sparql or Gremlin: ' + neptune_query_engine)
        
    if lambda_runtime == 'python':
        lambda_runtime = 'python3.6'
        
    if lambda_runtime == 'java':
        lambda_runtime = 'java8'  
        
    if not handler_name:
        handler_name = 'stream_handler.StreamHandler'

    id_suffix = str(uuid.uuid4()).split('-')[0]
    if not application_name:
        application_name = 'stream-handler-{}'.format(id_suffix)
        
    managed_policies = ','.join(managed_policy_arns)
    
    neptune = boto3.client('neptune', region_name=region)
    ec2 = boto3.client('ec2', region_name=region)
    cfn = boto3.client('cloudformation', region_name=region)

    describe_db_clusters_response = neptune.describe_db_clusters(
        DBClusterIdentifier=cluster_id
    )

    db_cluster = describe_db_clusters_response.get('DBClusters')[0]

    cluster_endpoint = db_cluster.get('Endpoint')
    reader_endpoint = db_cluster.get('ReaderEndpoint')
    port = db_cluster.get('Port')
    stream_endpoint = 'https://{}:{}/{}/stream'.format(cluster_endpoint, port, query_engine.lower())
    is_iam_auth_enabled = str(db_cluster.get('IAMDatabaseAuthenticationEnabled')).lower()
    cluster_resource_id = db_cluster.get('DbClusterResourceId')

    security_group_ids = list(map(lambda s: s.get('VpcSecurityGroupId'), db_cluster.get('VpcSecurityGroups')))

    instance_id = db_cluster.get('DBClusterMembers')[0].get('DBInstanceIdentifier')

    describe_db_instances_response = neptune.describe_db_instances(
        DBInstanceIdentifier=instance_id
    )

    subnet_group = describe_db_instances_response.get('DBInstances')[0].get('DBSubnetGroup')

    vpc_id = subnet_group.get('VpcId')

    subnet_ids = list(map(lambda s: s.get('SubnetIdentifier'), subnet_group.get('Subnets')))

    describe_subnets_response = ec2.describe_subnets(SubnetIds=subnet_ids)

    describe_route_tables_response = ec2.describe_route_tables(
        Filters=[
            {
                'Name': 'association.subnet-id',
                'Values': subnet_ids
            },
        ]
    )

    route_tables = describe_route_tables_response.get('RouteTables')

    if len(route_tables) == 0:
        raise Exception('You must explictly associate your subnets ' + str(subnet_ids) + ' to a Route Table')

    route_table_ids = list(set(map(lambda s: s.get('RouteTableId'), route_tables)))

    describe_vpc_endpoints_response = ec2.describe_vpc_endpoints(
        Filters=[
            {
                'Name': 'vpc-id',
                'Values': [
                    vpc_id
                ]
            },
        ]
    )

    vpc_endpoint_services = list(map(lambda s: s.get('ServiceName'), describe_vpc_endpoints_response.get('VpcEndpoints')))

    create_ddb_endpoint = str(not(any(e.endswith('.dynamodb') for e in vpc_endpoint_services))).lower()
    create_monitoring_endpoint = str(not(any(e.endswith('.monitoring') for e in vpc_endpoint_services))).lower()

    describe_vpc_attribute_response = ec2.describe_vpc_attribute(
        Attribute='enableDnsSupport',
        VpcId=vpc_id
    )

    if not describe_vpc_attribute_response.get('EnableDnsSupport').get('Value'):
        raise Exception('You must enable DNS resolution in your VPC in order to use the DynamoDB Gateway Endpoint')
        
    default_additional_params = {
        'cluster_id': cluster_id,
        'neptune_cluster_endpoint': cluster_endpoint,
        'neptune_reader_endpoint': reader_endpoint,
        'neptune_port': port,
        'iam_auth_enabled': is_iam_auth_enabled == 'true',
        'NeptuneCluster': '{}:{}'.format(cluster_endpoint, port)
    }
    
    for (k,v) in additional_params.items():
        default_additional_params[k] = v
        
    additional_params_json = json.dumps(default_additional_params)

    print('APPLICATION: {}'.format(application_name))
    
    print()
    print('NEPTUNE')
    print(tabulate([
        ['Parameter', 'Value', 'Description'],
        ['stream_endpoint', stream_endpoint, 'Endpoint for source Neptune Stream. This is of the form http(s)://<cluster>:<port>/gremlin/stream or http(s)://<cluster>:<port>/sparql/stream.'],
        ['is_iam_auth_enabled', is_iam_auth_enabled, 'Flag to determine if IAM Auth is enabled for source Neptune cluster.']
    ],headers="firstrow",tablefmt="fancy_grid"))
    
    print()
    print('HANDLER')
    print(tabulate([
        ['Parameter', 'Value', 'Description'],
        ['handler_s3_bucket', handler_s3_bucket, 'Handler Lambda S3 bucket.'],
        ['handler_s3_key', handler_s3_key, 'Handler Lambda S3 key.'],
        ['handler_name', handler_name, 'Name of the handler for processing stream records.'],
        ['additional_params', additional_params_json, 'Additional params to be supplied to the handler via an AdditionalParams environment variable in the form of a JSON object.']
    ],headers="firstrow",tablefmt="fancy_grid"))
    
    print()
    print('POLLING FRAMEWORK LAMBDA FUNCTION')
    print(tabulate([
        ['Parameter', 'Value', 'Description'],
        ['lambda_memory_size_mb', lambda_memory_size_mb, 'Poller Lambda memory size (in MB).'],
        ['lambda_runtime', lambda_runtime, 'Poller Lambda runtime (python3.6 or java8).'],
        ['lambda_logging_level', lambda_logging_level, 'Poller Lambda logging level.'],
        ['managed_policies', managed_policies, 'Comma-delimited list of ARNs of managed policies to be attached to Lambda execution role (for accessing other AWS resources from your handler).']
    ],headers="firstrow",tablefmt="fancy_grid"))
    
    print()
    print('POLLING FRAMEWORK CONFIG')
    print(tabulate([
        ['Parameter', 'Value', 'Description'],
        ['batch_size', batch_size, 'Number of records to be read from stream in each batch. Should be between 1 to 10000.'],
        ['max_polling_wait_time_seconds', max_polling_wait_time_seconds, 'Maximum wait time in seconds between two successive polling from stream. Set value to 0 sec for continuous polling. Maximum value can be 3600 sec (1 hour).'],
        ['max_polling_interval_seconds', max_polling_interval_seconds, 'Period for which we can continuously poll stream for records on one Lambda instance. Should be between 5 sec to 900 sec. This parameter is used to set Poller Lambda Timeout.'],
        ['step_function_fallback_period', step_function_fallback_period, 'Period after which Step Function is invoked using CloudWatch Events to recover from failure.'],
        ['step_function_fallback_period_unit', step_function_fallback_period_unit, 'Should be one of minutes, minute, hours, hour, days, day.'],
        ['create_cloudwatch_alarm', str(create_cloudwatch_alarm).lower(), 'Flag used to determine whether to create CloudWatch alarm.'],
        ['notification_email', notification_email, 'Email address for CloudWatch alarm notification.']
    ],headers="firstrow",tablefmt="fancy_grid"))
    
    print()
    print('NETWORK')
    print(tabulate([
        ['Parameter', 'Value', 'Description'],
        ['vpc_id', vpc_id, 'VPC of your Neptune database.'],
        ['subnet_ids', subnet_ids, 'Neptune subnets.'],
        ['security_group_ids', security_group_ids, 'VPC security groups.'],
        ['route_table_ids', route_table_ids, 'Comma-delimited list of route table ids associated with the Neptune subnets.'],
        ['create_ddb_endpoint', create_ddb_endpoint, 'Flag used to determine whether to create DynamoDB VPC endpoint.'],
        ['create_monitoring_endpoint', create_monitoring_endpoint, 'Flag used to determine whether to create Monitoring VPC endpoint.']
    ],headers="firstrow",tablefmt="fancy_grid"))
    
    print()
    

    params = [
        {
            'ParameterKey': 'ApplicationName',
            'ParameterValue': application_name
        },
        {
            'ParameterKey': 'AdditionalParams',
            'ParameterValue': additional_params_json
        },
        {
            'ParameterKey': 'LambdaS3Bucket',
            'ParameterValue': handler_s3_bucket
        },
        {
            'ParameterKey': 'LambdaS3Key',
            'ParameterValue': handler_s3_key
        },
        {
            'ParameterKey': 'LambdaMemorySize',
            'ParameterValue': str(lambda_memory_size_mb)
        },
        {
            'ParameterKey': 'LambdaRuntime',
            'ParameterValue': lambda_runtime
        },
        {
            'ParameterKey': 'LambdaLoggingLevel',
            'ParameterValue': lambda_logging_level
        },
        {
            'ParameterKey': 'ManagedPolicies',
            'ParameterValue': managed_policies
        },
        {
            'ParameterKey': 'StreamRecordsHandler',
            'ParameterValue': handler_name
        },
        {
            'ParameterKey': 'StreamRecordsBatchSize',
            'ParameterValue': str(batch_size)
        },
        {
            'ParameterKey': 'MaxPollingWaitTime',
            'ParameterValue': str(max_polling_wait_time_seconds)
        },
        {
            'ParameterKey': 'MaxPollingInterval',
            'ParameterValue': str(max_polling_interval_seconds)
        },
        
        {
            'ParameterKey': 'NeptuneStreamEndpoint',
            'ParameterValue': stream_endpoint
        },
        {
            'ParameterKey': 'IAMAuthEnabledOnSourceStream',
            'ParameterValue': is_iam_auth_enabled
        },
        {
            'ParameterKey': 'StreamDBClusterResourceId',
            'ParameterValue': cluster_resource_id
        },
        {
            'ParameterKey': 'StepFunctionFallbackPeriod',
            'ParameterValue': str(step_function_fallback_period)
        },
        {
            'ParameterKey': 'StepFunctionFallbackPeriodUnit',
            'ParameterValue': step_function_fallback_period_unit
        },
        {
            'ParameterKey': 'NotificationEmail',
            'ParameterValue': notification_email
        },
        {
            'ParameterKey': 'VPC',
            'ParameterValue': vpc_id
        },
        {
            'ParameterKey': 'SubnetIds',
            'ParameterValue': ",".join(subnet_ids)
        },
        {
            'ParameterKey': 'SecurityGroupIds',
            'ParameterValue': ",".join(security_group_ids)
        },
        {
            'ParameterKey': 'RouteTableIds',
            'ParameterValue': ",".join(route_table_ids)
        },
        {
            'ParameterKey': 'CreateDDBVPCEndPoint',
            'ParameterValue': create_ddb_endpoint
        },
        {
            'ParameterKey': 'CreateMonitoringEndPoint',
            'ParameterValue': create_monitoring_endpoint
        },
        {
            'ParameterKey': 'CreateCloudWatchAlarm',
            'ParameterValue': str(create_cloudwatch_alarm).lower()
        }    
    ]

    if not dry_run:
        create_stack_response = cfn.create_stack(
            StackName='streams-{}'.format(id_suffix),
            TemplateURL='https://s3.amazonaws.com/aws-neptune-customer-samples/neptune-stream/neptune_stream_poller_nested_full_stack.json',
            Parameters=params,
            DisableRollback=True,
            TimeoutInMinutes=30,
            Capabilities=['CAPABILITY_NAMED_IAM']
        )

        return create_stack_response
    else:
        return 'Dry run: {}'.format(params)