build_obelics/12_01_find_opt_out_images.py (79 lines of code) (raw):
import asyncio
import os
import time
from typing import List
import aiohttp
from aiolimiter import AsyncLimiter
from datasets import load_from_disk
PATH_DS_IMG_URLS_S3 = ( # Made by concatenating s3://m4-datasets/webdocs/img_urls_in_final_web_docs_2/ + converting to HF dataset
"s3://m4-datasets/trash/ds_img_urls"
)
PATH_DS_IMG_URLS_LOCAL = "/scratch/ds_img_urls"
PATH_SAVE_DISK_OPT_OUT = "/scratch/ds_opt_out"
PATH_SAVE_S3_OPT_OUT = "s3://m4-datasets/webdocs/ds_opt_out"
MAX_NUM_RETRIES_SYNC = 3
TOKEN = os.environ["SPAWNING_TOKEN"]
BATCH_SIZE = 100_000
headers = {"Authorization": f"API {TOKEN}"}
API_BATCH_SIZE = 1000
RETRY_TIMES = ([1] * 5) + ([10] * 5) + ([10 * 60] * 10) + ([60 * 60] * 10)
TIMEOUT = 60
async def check_spawning(image_urls: List[str], session: aiohttp.ClientSession, semaphore, limiter) -> dict:
url = "https://opts-api.spawningaiapi.com/api/v2/query/urls"
if not image_urls:
return {"urls": []}
elif len(image_urls) == 1:
image_urls = image_urls + [""] # the API requires > 1 urls
async with semaphore:
async with limiter:
resp_body = None
for retry_time in RETRY_TIMES:
try:
async with session.post(
url=url,
data="\n".join(image_urls),
timeout=TIMEOUT,
) as resp:
resp_body = await resp.text()
spawning_response = await resp.json()
assert "urls" in spawning_response, str(resp_body)
return spawning_response
except Exception:
pass
time.sleep(retry_time)
with open("erroring_urls.txt", "w") as f:
f.write("\n".join(image_urls))
raise RuntimeError(str(resp_body))
async def opt_in_out_task(image_urls: List[str], session, semaphore, limiter) -> tuple:
spawning_response = await check_spawning(image_urls, session, semaphore, limiter)
urls_responses = spawning_response["urls"]
urls_opt_out = [urls_response["optOut"] for urls_response in urls_responses]
return urls_opt_out
async def parallel_opt_in_out_task(urls: list) -> tuple:
tasks = []
semaphore = asyncio.Semaphore(value=10)
limiter = AsyncLimiter(20, time_period=1)
async with aiohttp.ClientSession(headers=headers) as session:
for offset in range(0, len(urls), API_BATCH_SIZE):
tasks.append(
asyncio.create_task(
opt_in_out_task(urls[offset : offset + API_BATCH_SIZE], session, semaphore, limiter)
)
)
await asyncio.wait(tasks)
tasks_results = [task.result() for task in tasks]
tasks_results = [sub_el for el in tasks_results for sub_el in el]
return tasks_results
def opt_in_out(batch: dict) -> None:
opt_out_bool = asyncio.run(parallel_opt_in_out_task(batch))
return opt_out_bool
if __name__ == "__main__":
command_sync_s3 = f"aws s3 sync {PATH_DS_IMG_URLS_S3} {PATH_DS_IMG_URLS_LOCAL}"
for _ in range(MAX_NUM_RETRIES_SYNC):
os.system(command_sync_s3)
ds = load_from_disk(PATH_DS_IMG_URLS_LOCAL)
ds = ds.filter(opt_in_out, input_columns=["url"], batched=True, batch_size=BATCH_SIZE)
ds.save_to_disk(PATH_SAVE_DISK_OPT_OUT)
command_sync_s3 = f"aws s3 sync {PATH_SAVE_DISK_OPT_OUT} {PATH_SAVE_S3_OPT_OUT}"
for _ in range(MAX_NUM_RETRIES_SYNC):
os.system(command_sync_s3)