def lambda_handler()

in lib/state_machine_trigger/lambda_handler.py [0:0]


def lambda_handler(event, context):
    print(event)
    lambda_message = event['Records'][0]
    source_bucket_name = lambda_message['s3']['bucket']['name']
    key = lambda_message['s3']['object']['key']

    p_full_path = key
    # first object/directory name after buckname will be used as source system name example:
    # s3://<buckename>/<source_system_name>/<table_name>
    p_source_system_name = key.split('/')[0]
    # second object/directory name after buckname will be used as tablename name example:
    # s3://<buckename>/<source_system_name>/<table_name>
    p_table_name = key.split('/')[1]
    p_file_dir = os.path.dirname(p_full_path)
    p_file_dir_upd = p_file_dir.replace('%3D', '=')
    p_base_file_name = os.path.basename(p_full_path)
    sfn_arn = os.environ['SFN_STATE_MACHINE_ARN']
    target_bucket_name = os.environ['target_bucket_name']
    raw_to_conformed_etl_job_name = 'raw_to_conformed_etl_job'

    logger.info('bucket: ' + source_bucket_name)
    logger.info('key: ' + key)
    logger.info('source system name: ' + p_source_system_name)
    logger.info('table name: ' + p_table_name)
    logger.info('File Path: ' + p_file_dir)
    logger.info('p_file_dir_upd: ' + p_file_dir_upd)
    logger.info('file base name: ' + p_base_file_name)
    logger.info('state machine arn: ' + sfn_arn)
    logger.info('target bucket name: ' + target_bucket_name)

    if p_base_file_name != '':
        # Capturing the current time in CST
        central = dateutil.tz.gettz('US/Central')
        now = datetime.now(tz=central)
        p_ingest_time = now.strftime('%m/%d/%Y %H:%M:%S')
        logger.info(p_ingest_time)
        # Time stamp for the stepfunction name
        p_stp_fn_time = now.strftime('%Y%m%d%H%M%S%f')

        p_year = now.strftime('%Y')
        p_month = now.strftime('%m')
        p_day = now.strftime('%d')

        logger.info('year: ' + p_year)
        logger.info('p_month: ' + p_month)
        logger.info('p_day: ' + p_day)
        logger.info('sfn name: ' + p_base_file_name + '-' + p_stp_fn_time)
        sfn_name = p_base_file_name + '-' + p_stp_fn_time
        print('before step function')
        execution_id = str(uuid.uuid4())
        sfn_input = json.dumps(
            {
                'JOB_NAME': raw_to_conformed_etl_job_name,
                'target_databasename': p_source_system_name,
                'target_bucketname': target_bucket_name,
                'source_bucketname': source_bucket_name,
                'source_key': p_file_dir_upd, 
                'base_file_name': p_base_file_name,
                'p_year': p_year,
                'p_month': p_month,
                'p_day': p_day,
                'table_name': p_table_name,
                'execution_id': execution_id,
            }
        )
        logger.info(sfn_input)
        try:
            sfn_client = boto3.client('stepfunctions')
            sfn_response = sfn_client.start_execution(
                stateMachineArn=sfn_arn,
                name=sfn_name,
                input=sfn_input
            )
            print(sfn_response)
        except botocore.exceptions.ClientError as error:
            logger.info('[ERROR] Step function client process failed:{}'.format(error))
            raise error
        except Exception as e:
            logger.info('[ERROR] Step function call failed:{}'.format(e))
            raise e

        start_etl_job_run(execution_id, p_stp_fn_time, sfn_arn, sfn_name, os.environ['DYNAMODB_TABLE_NAME'], sfn_input)

    return {
        'statusCode': 200,
        'body': json.dumps('Step function triggered successfully!')
    }