def download_and_extract_archive_from_s3()

in src/graph_notebook/seed/load_query.py [0:0]


def download_and_extract_archive_from_s3(bucket_name, filepath):
    """
    Depending on the S3 path provided, we can handle three possible cases here:
        1. plain S3 directory
        2. zip/tar archive
        3. single data file

    We will first attempt to send a signed AWS request to retrieve the S3 file. If credentials cannot be located, the
    request will be retried once more, unsigned.

    If the S3 request succeeds, this function will create a temporary file(or folder containing data files, in the case
    of a directory/archive URI) in the immediate Jupyter directory. After the datafiles are processed in get_queries,
    the temporary file is deleted.
    """
    base_file = os.path.basename(filepath)
    if not base_file:
        base_file = filepath
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    while True:
        try:
            if base_file.endswith('/'):
                if not os.path.exists(base_file):
                    os.makedirs(base_file)
                for obj in bucket.objects.filter(Prefix=filepath):
                    if not obj.key.endswith('/'):
                        new_file = os.path.basename(obj.key)
                        target_file = base_file + new_file
                        bucket.download_file(obj.key, target_file)
            else:
                bucket.download_file(filepath, base_file)
            break
        except ClientError as e:
            if e.response['Error']['Code'] in ["404", "403"]:
                print("Unable to access the sample dataset specified.")
            raise
        except NoCredentialsError:
            # if no AWS credentials are available, retry with unsigned request.
            s3.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)
    is_archive = True
    if os.path.isdir(base_file):  # check this first so we don't get an IsADirectoryError in below conditionals
        is_archive = False
    elif tarfile.is_tarfile(base_file):
        tar_file = tarfile.open(base_file)
        tar_file.extractall()
        tar_file.close()
    elif zipfile.is_zipfile(base_file):
        with zipfile.ZipFile(base_file, 'r') as zf:
            zf.extractall()
    else:
        is_archive = False
    if is_archive:
        # we have the extracted contents elsewhere now, so delete the downloaded archive.
        os.remove(base_file)
        path_to_data_sets = pjoin(os.getcwd(), os.path.splitext(base_file)[0])
    else:
        # Any other filetype. If unreadable, we'll handle it in the file_to_query function.
        path_to_data_sets = pjoin(os.getcwd(), base_file)
    return path_to_data_sets