def main()

in source/glue-job-scripts/etl-cleanup.py [0:0]


def main():
    """
    Deletes any previous data that was exported from DynamoDB to S3 so
    the current ETL job will represent the current state of the DynamoDB tables
    """
    log_message(f"Looking for previously generated output files: s3://{GLUE_OUTPUT_BUCKET}/{GLUE_OUTPUT_S3_KEY_PREFIX}")

    list_params = {
        "Bucket": GLUE_OUTPUT_BUCKET,
        "Prefix": GLUE_OUTPUT_S3_KEY_PREFIX
    }

    previous_job_output_data = set()

    while True:
        response = s3.list_objects_v2(**list_params)
        if response["KeyCount"] > 0:
            # Extract only a list of Keys from the Contents returned by S3
            previous_job_output_data.update(list(map(lambda x: x["Key"], response["Contents"])))

        if "NextContinuationToken" not in response:
            # Exit the `while` loop if there are no more objects in the S3 bucket
            break
        else:
            # Search again if there are more items in the S3 bucket
            list_params["ContinuationToken"] = response["NextContinuationToken"]

    log_message(f"Number of previously generated output files: {len(previous_job_output_data)}")

    while len(previous_job_output_data) > 0:
        # Delete up to 500 objects at a time until the list of previously
        # generated output files is empty
        objects_to_delete = list(islice(previous_job_output_data, 500))

        log_message(f"Attempting to delete batch of previously generated data. Number of objects to delete: {len(objects_to_delete)}")

        delete_params = {
            "Bucket": GLUE_OUTPUT_BUCKET,
            "Delete": {
                "Objects": list(map(lambda x: { "Key": x }, objects_to_delete))
            }
        }

        delete_response = s3.delete_objects(**delete_params)

        if "Errors" in delete_response and len(delete_response["Errors"]) > 0:
            raise DataCleanupException(f"Error while cleaning previous job output: {str(delete_response['Errors'][0])}")

        if "Deleted" not in delete_response or len(delete_response["Deleted"]) != len(objects_to_delete):
            raise DataCleanupException(f"Error while cleaning previous job output. Expecting {len(objects_to_delete)} to be deleted but S3 reported {len(delete_response['Deleted'])} were deleted")

        # Remove the objects that were deleted from the 'previous_job_output_data' set
        previous_job_output_data = (previous_job_output_data - set(objects_to_delete))
        log_message(f"Successfully deleted {len(objects_to_delete)} objects. Number still left to delete: {len(previous_job_output_data)}")

    job.commit()