in services/worker/src/worker/job_runners/config/opt_in_out_urls_count.py [0:0]
def compute_opt_in_out_urls_count_response(dataset: str, config: str) -> tuple[OptInOutUrlsCountResponse, float]:
logging.info(f"compute 'config-opt-in-out-urls-count' for {dataset=} {config=}")
urls_columns = []
num_opt_in_urls = 0
num_opt_out_urls = 0
num_urls = 0
num_scanned_rows = 0
full_scan_count = 0
splits = get_split_names(dataset=dataset, config=config)
try:
total = 0
pending = 0
for split in splits:
total += 1
try:
response = get_response(
kind="split-opt-in-out-urls-count", dataset=dataset, config=config, split=split
)
except CachedArtifactNotFoundError:
logging.debug("No response found in previous step for this dataset: 'split-opt-in-out-urls-count'.")
pending += 1
continue
if response["http_status"] != HTTPStatus.OK:
logging.debug(f"Previous step gave an error: {response['http_status']}.")
continue
split_opt_in_out_content = response["content"]
urls_columns.extend(split_opt_in_out_content["urls_columns"])
num_opt_in_urls += split_opt_in_out_content["num_opt_in_urls"]
num_opt_out_urls += split_opt_in_out_content["num_opt_out_urls"]
num_urls += split_opt_in_out_content["num_urls"]
num_scanned_rows += split_opt_in_out_content["num_scanned_rows"]
full_scan_count += 1 if split_opt_in_out_content["full_scan"] else 0
except Exception as e:
raise PreviousStepFormatError("Previous step did not return the expected content.", e) from e
unique_urls_columns = sorted(list(set(urls_columns)))
has_urls_columns = len(unique_urls_columns) > 0
progress = (total - pending) / total if total else 1.0
full_scan = full_scan_count == total
return (
OptInOutUrlsCountResponse(
urls_columns=unique_urls_columns,
has_urls_columns=has_urls_columns,
num_opt_in_urls=num_opt_in_urls,
num_opt_out_urls=num_opt_out_urls,
num_scanned_rows=num_scanned_rows,
num_urls=num_urls,
full_scan=full_scan,
),
progress,
)