obelics/processors/warc_downloader.py (36 lines of code) (raw):
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, ProxyConnectionError
class WarcDownloader:
def __init__(self):
config_boto = Config(
# region_name="us-east-1", # Location of the CC data, commenting this line since it doesn't help
retries={"max_attempts": 10, "mode": "standard"}
)
self.client = boto3.client("s3", config=config_boto)
def __call__(self, example):
if example["warc"] and not example["warc_error"]:
return example
warc_filename = example["warc_filename"]
warc_record_offset = example["warc_record_offset"]
warc_record_length = example["warc_record_length"]
warc, warc_error = self.get_warc_from_metadata(
client=self.client,
warc_filename=warc_filename,
warc_record_offset=warc_record_offset,
warc_record_length=warc_record_length,
)
example["warc"] = warc
example["warc_error"] = warc_error
return example
def get_warc_from_metadata(self, client, warc_filename, warc_record_offset, warc_record_length):
try:
response = client.get_object(
Bucket="commoncrawl",
Key=warc_filename,
Range=f"bytes={warc_record_offset}-{warc_record_offset + warc_record_length - 1}",
)
except (ClientError, ProxyConnectionError) as e:
return b"", repr(e)
return response["Body"].read(), ""
# Needed to make multiprocessing work
def __reduce__(self):
return (self.__class__, ())