assets/glue/scripts/transform_raw_to_clean.py [17:78]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
s3_resource = boto3.resource('s3')


def check_if_file_exist(bucket, key):
    s3client = boto3.client('s3')
    file_exists = True
    try:
        s3client.head_object(Bucket=bucket, Key=key)
    except ClientError:
        file_exists = False
        pass

    return file_exists


def move_temp_file(bucket, key):
    dt = datetime.now()
    dt.microsecond

    new_file_name = str(dt.microsecond) + '_' + key
    s3_resource.Object(args['temp_workflow_bucket'], new_file_name).copy_from(CopySource="{}/{}".format(bucket, key))
    s3_resource.Object(args['temp_workflow_bucket'], key).delete()


def cleanup_temp_folder(bucket, key):
    if check_if_file_exist(bucket, key):
        move_temp_file(bucket, key)


def is_first_run():
    """
    Checks if the number of job runs for this job is 0.

    TODO: check if at least one successful job is in the

    :return: True if this is the first job run
    """

    client = boto3.client('glue', region_name=args["region"])
    runs = client.get_job_runs(
        JobName=args["JOB_NAME"],
        MaxResults=1
    )

    # return len(runs["JobRuns"]) == 0
    return True  # TODO currently only first run is supported


def write_job_state_information(readings):
    """
     get the distinct date value and store them in a temp S3 bucket to now which aggregation data need to be
     calculated later on
    """
    distinct_dates = readings.select('date_str').distinct().collect()
    distinct_dates_str_list = list(value['date_str'] for value in distinct_dates)

    state = {
        "dates": distinct_dates_str_list,
        "first_run": is_first_run()
    }

    s3_resource.Object(args['temp_workflow_bucket'], 'glue_workflow_distinct_dates').put(Body=json.dumps(state))
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



assets/glue/scripts/transform_raw_to_clean_london.py [14:74]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
s3_resource = boto3.resource('s3')

def check_if_file_exist(bucket, key):
    s3client = boto3.client('s3')
    file_exists = True
    try:
        s3client.head_object(Bucket=bucket, Key=key)
    except ClientError:
        file_exists = False
        pass

    return file_exists


def move_temp_file(bucket, key):
    dt = datetime.now()
    dt.microsecond

    new_file_name = str(dt.microsecond) + '_' + key
    s3_resource.Object(args['temp_workflow_bucket'], new_file_name).copy_from(CopySource="{}/{}".format(bucket, key))
    s3_resource.Object(args['temp_workflow_bucket'], key).delete()


def cleanup_temp_folder(bucket, key):
    if check_if_file_exist(bucket, key):
        move_temp_file(bucket, key)


def is_first_run():
    """
    Checks if the number of job runs for this job is 0.

    TODO: check if at least one successful job is in the

    :return: True if this is the first job run
    """

    client = boto3.client('glue', region_name=args["region"])
    runs = client.get_job_runs(
        JobName=args["JOB_NAME"],
        MaxResults=1
    )

    # return len(runs["JobRuns"]) == 0
    return True # TODO currently only first run is supported


def write_job_state_information(readings):
    """
     get the distinct date value and store them in a temp S3 bucket to now which aggregation data need to be
     calculated later on
    """
    distinct_dates = readings.select('date_str').distinct().collect()
    distinct_dates_str_list = list(value['date_str'] for value in distinct_dates)

    state = {
        "dates": distinct_dates_str_list,
        "first_run": is_first_run()
    }

    s3_resource.Object(args['temp_workflow_bucket'], 'glue_workflow_distinct_dates').put(Body=json.dumps(state))
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



