in moz_kinto_publisher/main.py [0:0]
def publish_crlite(*, args, rw_client, channel, timeout=timedelta(minutes=5)):
# returns the run_id of a new full filter if one is published, otherwise None
rv = None
existing_records = rw_client.get_records(
collection=settings.KINTO_CRLITE_COLLECTION
)
existing_records = [
x for x in existing_records if x.get("channel", CHANNEL_DEFAULT) == channel.slug
]
# Sort existing_records for crlite_verify_record_consistency,
# which gets called in crlite_determine_publish.
existing_records = sorted(existing_records, key=lambda x: x["details"]["name"])
published_run_db = PublishedRunDB(args.filter_bucket)
# Wait for the most recent run to finish.
try:
published_run_db.await_most_recent_run(timeout=timeout)
except TimeoutException as te:
log.warning(f"The most recent run is not ready to be published (waited {te}).")
return rv
tasks = crlite_determine_publish(
existing_records=existing_records, run_db=published_run_db, channel=channel
)
log.debug(f"crlite_determine_publish tasks={tasks}")
if not tasks["upload"]:
log.info("Nothing to do.")
return rv
args.download_path.mkdir(parents=True, exist_ok=True)
final_run_id = tasks["upload"][-1]
final_run_id_path = args.download_path / Path(final_run_id)
final_run_id_path.mkdir(parents=True, exist_ok=True)
filter_path = final_run_id_path / Path("filter")
workflow.download_and_retry_from_google_cloud(
args.filter_bucket,
f"{final_run_id}/{channel.dir}/filter",
filter_path,
timeout=timedelta(minutes=5),
)
if tasks["clear_all"]:
log.info(f"Uploading a full filter based on {final_run_id}.")
clear_crlite_filters(rw_client=rw_client, noop=args.noop, channel=channel)
clear_crlite_deltas(rw_client=rw_client, noop=args.noop, channel=channel)
assert filter_path.is_file(), "Missing local copy of filter"
publish_crlite_main_filter(
filter_path=filter_path,
filename=f"{final_run_id}-{channel.slug}.filter",
rw_client=rw_client,
timestamp=published_run_db.get_run_timestamp(final_run_id),
channel=channel,
noop=args.noop,
)
rv = final_run_id
else:
log.info("Uploading deltas.")
previous_id = existing_records[-1]["id"]
for run_id in tasks["upload"]:
run_id_path = args.download_path / Path(run_id)
run_id_path.mkdir(parents=True, exist_ok=True)
delta_path = run_id_path / Path("delta")
workflow.download_and_retry_from_google_cloud(
args.filter_bucket,
f"{run_id}/{channel.dir}/{channel.delta_filename}",
delta_path,
timeout=timedelta(minutes=5),
)
assert delta_path.is_file(), "Missing local copy of delta"
previous_id = publish_crlite_delta(
delta_path=delta_path,
filename=f"{run_id}-{channel.slug}.{channel.delta_filename}",
rw_client=rw_client,
previous_id=previous_id,
timestamp=published_run_db.get_run_timestamp(run_id),
channel=channel,
noop=args.noop,
)
if not args.noop:
rw_client.request_review_of_collection(
collection=settings.KINTO_CRLITE_COLLECTION
)
return rv