utils/local_remote_settings/__main__.py (439 lines of code) (raw):
"""
Runs models for use within Firefox via local Remote Settings.
The artifacts will be saved to: ./data/artifacts
To run the local models:
* Open Firefox (debug builds don't seem to work)
* Run the following from the web console.
ChromeUtils.importESModule("resource://services-settings/remote-settings.sys.mjs")
.RemoteSettings("translations-wasm").verifySignature = false;
ChromeUtils.importESModule("resource://services-settings/remote-settings.sys.mjs")
.RemoteSettings("translations-models").verifySignature = false;
* Install the Remote Settings Devtools xpi
https://github.com/mozilla-extensions/remote-settings-devtools/releases
* Click the addon's button
* Change the Environment from Prod to Local
* Click "Clear all"
* Click "Sync"
* Once you are done, make sure to switch the Environment back to Prod.
Usage:
- Load a model from a task group
task local-remote-settings -- --taskgroup_ids I9uKJEPvQd-1zeItJK0cOQ aAVZJcsXQg-vfGIjHmcCTw
- Run a local mirror of the production models
task local-remote-settings -- --sync_remote_settings
"""
import re
import gzip
import sys
import time
import json
import yaml
import shutil
import argparse
import requests
import threading
import subprocess
import taskcluster
from uuid import uuid4
from pathlib import Path
from kinto_http import Client
from typing import IO, Callable, Optional, Type, Union
from pipeline.common.logging import get_logger
from pipeline.common.downloads import stream_download_to_file
from utils.common.taskcluster_api import (
ListArtifacts,
ListDependentTasks,
ListTaskGroup,
TaskAndStatus,
)
from utils.common.remote_settings import (
ModelRecord,
ModelsResponse,
WasmResponse,
WasmRecord,
models_collection,
wasm_collection,
get_prod_records_url,
)
logger = get_logger("util")
docker_logger = get_logger("docker")
mount_path = Path(__file__).parent / "mount"
data_path = (Path(__file__).parent / "../../data").resolve()
attachments_path = data_path / "remote-settings/attachments"
attachments_path.mkdir(parents=True, exist_ok=True)
models_path = data_path / "remote-settings/models"
models_path.mkdir(parents=True, exist_ok=True)
bucket = "main"
def sync_records(
remote_settings: Client,
collection: str,
record_response_class: Union[Type[WasmResponse], Type[ModelsResponse]],
record_class: Union[Type[WasmRecord], Type[ModelRecord]],
):
"""
Sync records from the production Remote Settings with a local version of it.
"""
logger.info(f'Syncing records for "{collection}"')
url = get_prod_records_url(collection)
response = requests.get(url)
response.raise_for_status()
records_response = record_response_class(**response.json())
new_records = {data.id: data for data in records_response.data}
existing_records = [
record_class(**data)
for data in remote_settings.get_records(bucket=bucket, collection=collection)
]
for record in existing_records:
id = record.id
if new_records.get(id):
logger.info(f"Record exists {record.name} {record.version}")
# The new record already exists.
del new_records[id]
else:
logger.info(f"Removing outdated record {record.name} {record.version}")
remote_settings.delete_record(id=id, collection=collection, bucket=bucket)
for record in new_records.values():
logger.info(f"Creating record {record.name} {record.version}")
attachment = record.attachment
assert attachment
if ".." in attachment.location:
raise Exception(f"Attachment location changes directory {attachment.location}")
# TODO - This needs an upstream fix.
# remote_settings.create_record(
# id=record.id,
# collection=collection,
# bucket=bucket,
# data=json.loads(record.json()),
# )
record_name = Path(record.name)
cache_dir = attachments_path / f"sync-{collection}"
cache_dir.mkdir(exist_ok=True)
attachment_file_path = (
cache_dir / f"{record_name.stem}-v{record.version}{record_name.suffix}"
)
if attachment_file_path.exists():
logger.info(f"✅ {attachment_file_path}")
else:
download_url = (
f"https://firefox-settings-attachments.cdn.mozilla.net/{attachment.location}"
)
with attachment_file_path.open("wb") as attachment_file:
try:
logger.info(
f"⬇️ Downloading {record.name} {record.version} from {download_url}"
)
response = requests.get(download_url, stream=True, allow_redirects=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
attachment_file.write(chunk)
except Exception as e:
logger.info(
f"Error occurred while downloading attachment {record.name} {record.version}: {e}"
)
record_data = json.loads(record.json())
del record_data["attachment"]
create_record_with_attachment(
record.id,
collection,
attachment.mimetype,
attachment_file_path,
record_data,
)
# TODO - This needs an upstream fix.
# remote_settings.add_attachment(
# id=record.id,
# filepath=str(attachment_file_path),
# collection=collection,
# bucket=bucket,
# mimetype=attachment.mimetype,
# )
logger.info(f"Attachment downloaded and added {record.name} {record.version}")
def create_remote_settings_environment(remote_settings: Client):
logger.info("Ensuring the buckets and collections are created")
remote_settings.create_bucket(id=bucket, if_not_exists=True)
remote_settings.create_collection(id=wasm_collection, bucket=bucket, if_not_exists=True)
remote_settings.create_collection(id=models_collection, bucket=bucket, if_not_exists=True)
class DockerContainerManager:
def __init__(
self,
container_name: str,
image: str,
volumes: dict[str, str],
env_vars: dict[str, str],
ports: dict[int, int],
):
"""
Initializes the Docker container manager.
container_name: Name of the Docker container.
image: Docker image to run.
volumes : Dictionary of host-to-container volume mappings.
env_vars : Dictionary of environment variables to pass to the container.
ports : Dictionary of host-to-container port mappings.
"""
self.container_name = container_name
self.image = image
self.volumes = volumes
self.env_vars = env_vars
self.ports = ports
self.process = None
def _build_docker_command(self):
"""Builds the Docker run command."""
cmd = ["docker", "run", "--rm", "--name", self.container_name]
# Add volumes
for host_path, container_path in self.volumes.items():
cmd += ["--volume", f"{host_path}:{container_path}"]
# Add environment variables
for key, value in self.env_vars.items():
cmd += ["--env", f"{key}={value}"]
# Add port mappings
for host_port, container_port in self.ports.items():
cmd += ["--publish", f"{host_port}:{container_port}"]
cmd += [self.image]
return cmd
def stream_output(self, stream: IO[str], log: Callable):
"""
Stream the output from a subprocess stream (stdout or stderr) in real time.
Args:
stream (io.TextIOWrapper): The stream to read from (e.g., stdout or stderr).
"""
def stream_reader():
try:
for line in iter(stream.readline, ""):
log(f"{line.strip()}")
except Exception as e:
log(f"Error reading from stream: {e}")
finally:
stream.close()
# Use a separate thread to avoid blocking
thread = threading.Thread(target=stream_reader, daemon=True)
thread.start()
def stop_and_remove_docker(self):
# Stop and remove any existing container with the same name
logger.info("Stopping translations-remote-settings.")
subprocess.run(
["docker", "stop", self.container_name],
check=False,
)
def start(self):
"""Starts the Docker container."""
self.stop_and_remove_docker()
logger.info(f"Starting Docker container '{self.container_name}'...")
docker_command = self._build_docker_command()
logger.info(f"Running: {' '.join(docker_command)}")
# Start the Docker container as a subprocess
self.process = subprocess.Popen(
docker_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
text=True,
)
# Stream Docker output to stdout
logger.info("Docker container is running.")
logger.info(f"Access it at: http://localhost:{list(self.ports.keys())[0]}/v1/admin")
# Stream stdout and stderr asynchronously
try:
stdout = self.process.stdout
assert stdout
self.stream_output(stdout, docker_logger.info)
stderr = self.process.stderr
assert stderr
self.stream_output(stderr, docker_logger.error)
except Exception as e:
docker_logger.error(f"Error streaming output: {e}")
def wait(self):
"""Waits for the Docker container process to complete, handling interruptions."""
if not self.process:
logger.info("No Docker container process to wait for.")
return
logger.info("Press Ctrl-C to stop the container.")
try:
self.process.wait()
except KeyboardInterrupt:
logger.info("\nStopping Docker container...")
self.process.terminate()
self.stop_and_remove_docker()
self.process.wait()
except Exception as e:
logger.info(f"An error occurred: {e}")
self.process.terminate()
self.stop_and_remove_docker()
self.process.wait()
def add_model_from_taskgroup_id(
queue: taskcluster.Queue, remote_settings: Client, taskgroup_id: str
):
logger.info(f"Looking up task group information for {taskgroup_id}")
list_task_group = ListTaskGroup.call(queue, taskgroup_id)
tasks = list_task_group.tasks
export_task = next((t for t in tasks if t.task.metadata.name.startswith("export-")), None)
if export_task:
add_model_from_export_task(queue, remote_settings, export_task)
return
logger.info(f"Could not find export task for {taskgroup_id}, checking for Train actions next.")
train_actions = [t for t in tasks if t.task.metadata.name == "Action: Train"]
if not train_actions:
logger.info("No train actions were found.")
logger.info(f"Found {len(train_actions)} train action tasks")
for train_action in train_actions:
action_task_id = train_action.status.taskId
dependents = ListDependentTasks.call(queue, action_task_id)
# We only need to get the first task to identify its Task Group ID.
export_task = next(
(t for t in dependents.tasks if t.task.metadata.name.startswith("export-")),
None,
)
if export_task:
if export_task.status.state == "completed":
logger.info(f"Found the action {action_task_id}'s export task")
add_model_from_export_task(queue, remote_settings, export_task)
else:
logger.info(
f"Found the action {action_task_id}'s export task, but it was not completed"
)
else:
logger.info(f"The action {action_task_id} didn't produce an export tasks.")
for t in dependents.tasks:
logger.debug(f" - {t.task.metadata.name}")
def add_model_from_export_task_id(queue: taskcluster.Queue, remote_settings: Client, task_id: str):
add_model_from_export_task(queue, remote_settings, TaskAndStatus.call(queue, task_id))
def add_model_from_export_task(
queue: taskcluster.Queue, remote_settings: Client, export_task: TaskAndStatus
):
last_run = export_task.status.runs[-1]
assert last_run, "A run was found in the export task"
action_task_id = export_task.status.taskGroupId
action_task = TaskAndStatus.call(queue, action_task_id)
response = requests.get(
queue.buildUrl("getLatestArtifact", action_task.status.taskId, "public/parameters.yml"),
stream=True,
)
response.raise_for_status()
parameters = yaml.safe_load(response.text)
config = parameters["training_config"]
experiment_name = config["experiment"]["name"]
match = re.search(r"export-(?P<src>\w+)-(?P<trg>\w+)+", export_task.task.metadata.name)
assert match
src = match.group("src")
trg = match.group("trg")
logger.info(
f"Looking up the artifacts for {export_task.task.metadata.name} ({export_task.status.taskId})"
)
list_artifacts = ListArtifacts.call(queue, export_task.status.taskId, last_run.runId)
artifacts = list_artifacts.artifacts
# public/build/lex.50.50.enlt.s2t.bin.gz
lex = next((a for a in artifacts if "/lex." in a.name), None)
# public/build/model.enlt.intgemm.alphas.bin.gz
model = next((a for a in artifacts if "/model." in a.name), None)
# public/build/vocab.enlt.spm.gz
vocab = next((a for a in artifacts if "/vocab." in a.name), None)
if not lex:
raise Exception("Could not find the lexical shortlist in artifacts")
if not model:
raise Exception("Could not find the model in artifacts")
if not vocab:
raise Exception("Could not find the vocab in artifacts")
model_path = models_path / f"{experiment_name}-{src}-{trg}-{export_task.status.taskId}"
model_path.mkdir(exist_ok=True)
config_path = model_path / "config.yml"
logger.info("Ensuring the artifacts are downloaded")
# Write out the config.
if config_path.exists():
logger.info(f"✅ {config_path}")
else:
config_text = yaml.safe_dump(config)
with config_path.open("wt") as config_file:
config_file.write(config_text)
logger.info(f"Destination: {config_path}")
downloads = [
(lex, "lex.s2t.bin"),
(model, "model.bin"),
(vocab, "vocab.spm"),
]
# 1. Cache the artifact files locally to `model_path`
# 2. Add them as records to Remote Settings.
for artifact, filename in downloads:
# First cache it locally.
destination = model_path / filename
if destination.exists():
logger.info(f"✅ {destination}")
else:
zipped_file = f"{destination}.gz"
stream_download_to_file(
queue.buildUrl("getLatestArtifact", export_task.status.taskId, artifact.name),
zipped_file,
)
# Decompress the gzip.
logger.info(f"Decompressing from {zipped_file} to {destination}")
with gzip.open(zipped_file, "rb") as f_in:
with open(destination, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
# Add it to Remote Settings, which retain it in-memory.
record = ModelRecord(
name=filename,
schema=0,
fromLang=src,
toLang=trg,
# Include the experiment name as a variant.
variant=experiment_name,
version="1.0",
fileType=filename.split(".")[0], # lex, model, vocab
attachment=None,
filter_expression="",
id=str(uuid4()),
last_modified=1728419357986, # This is just a plausible static value.
)
remote_settings.create_record(
id=record.id,
collection=models_collection,
bucket=bucket,
data=json.loads(record.json()),
)
remote_settings.add_attachment(
id=record.id,
filepath=destination,
collection=models_collection,
bucket=bucket,
)
logger.info(f"Attachment added {record.name} {record.version}")
def wait_for_remote_settings():
max_attempts = 500
timeout = 0.5
url = "http://localhost:8888/__heartbeat__"
logger.info(f"Checking to see if Remote Settings is ready: {url}")
for attempt in range(max_attempts):
try:
logger.info(f"Checking {url}")
response = requests.get(url, timeout=timeout)
if response.status_code == 200:
logger.info(f"Remote Settings is ready after {attempt + 1} attempts.")
return True
except requests.RequestException:
pass
time.sleep(timeout)
raise Exception("Remote Settings is not ready after maximum attempts.")
def create_record_with_attachment(
record_id: str,
collection: str,
mime_type: str,
attachment_path: Path,
record_data: dict,
) -> None:
url = f"http://localhost:8888/buckets/{bucket}/collections/{collection}/records/{record_id}/attachment"
logger.info(f"Posting record to {url}")
with open(attachment_path, "rb") as attachment:
files = {"attachment": (attachment_path.name, attachment, mime_type)}
form_data = {"data": json.dumps(record_data)}
response: Optional[requests.Response] = None
exception: Optional[Exception] = None
for _ in range(10):
exception = None
response = None
try:
response = requests.post(url, files=files, data=form_data)
if response.ok:
return
except Exception as e:
logger.warning(f"An exception occurred while creating a record: {e}")
exception = e
if response:
logger.warning(f"Response was not ok, code: {response.status_code}")
if response:
response.raise_for_status()
elif exception:
raise exception
def log_records(remote_settings: Client) -> None:
wasm_records = remote_settings.get_records(collection=wasm_collection, bucket=bucket)
model_records = remote_settings.get_records(collection=models_collection, bucket=bucket)
logger.info("Wasm records:")
for record_json in wasm_records:
record = WasmRecord(**record_json)
logger.info(f" - {record.name} {record.version}")
logger.info("Model records:")
for record_json in model_records:
record = ModelRecord(**record_json)
logger.info(f" - {record.name} {record.fromLang}-{record.toLang}")
logger.info("Remote Settings is ready: http://localhost:8888/v1/admin")
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
# Preserves whitespace in the help text.
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--sync_remote_settings",
action="store_true",
help="Mirror the production Remote Settings models",
)
parser.add_argument(
"--taskgroup_ids",
type=str,
help="Task groups that contain an export- task, or train actions.",
nargs="*",
)
parser.add_argument(
"--export_task_id",
type=str,
help="The export-{src}-{trg} task id to use for models",
nargs="*",
)
args = parser.parse_args()
taskgroup_ids: list[str] = args.taskgroup_ids or []
export_task_ids: list[str] = args.export_task_id or []
sync_remote_settings: bool = args.sync_remote_settings
if not sync_remote_settings and not export_task_ids and not taskgroup_ids:
# Print the help when nothing is provided.
print("--sync_remote_settings, --export_task_ids, or --taskgroup_ids must be provided\n")
parser.print_help()
sys.exit(0)
if sync_remote_settings and (export_task_ids or taskgroup_ids):
print("--sync_remote_settings can not be used with --export_task_ids or --taskgroup_ids")
sys.exit(1)
docker = DockerContainerManager(
container_name="translations-remote-settings",
image="mozilla/remote-settings",
volumes={
str(attachments_path): "/tmp/attachments",
str(mount_path): "/app/mount",
},
env_vars={"KINTO_INI": "mount/translations.ini"},
ports={8888: 8888},
)
logger.info("Starting remote settings")
docker.start()
# Initialize Remote Settings.
wait_for_remote_settings()
remote_settings = Client(server_url="http://localhost:8888/v1")
create_remote_settings_environment(remote_settings)
# The Wasm will be used from the production Remote Settings.
sync_records(remote_settings, wasm_collection, WasmResponse, WasmRecord)
if taskgroup_ids or export_task_ids:
# Pull specific models from Taskcluster.
queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"})
for taskgroup_id in taskgroup_ids:
add_model_from_taskgroup_id(queue, remote_settings, taskgroup_id)
for task_id in export_task_ids:
add_model_from_export_task_id(queue, remote_settings, task_id)
else:
# Sync records from the production Remote Settings.
sync_records(remote_settings, models_collection, ModelsResponse, ModelRecord)
log_records(remote_settings)
docker.wait()
if __name__ == "__main__":
main()