build-scripts/signing.py (282 lines of code) (raw):
import base64
from functools import cache
import os
import pathlib
from typing import Any, List, Optional
from const import APPLE_TEAM_ID
from manifest import CdSigningType, app_manifest, dmg_manifest, ime_manifest
from util import Args, Env, info, run_cmd, run_cmd_output, warn
import json
import shutil
import time
from importlib import import_module
REGION = "us-west-2"
SIGNING_API_BASE_URL = "https://api.signer.builder-tools.aws.dev"
@cache
def get_creds():
boto3 = import_module("boto3")
session = boto3.Session()
credentials = session.get_credentials()
creds = credentials.get_frozen_credentials()
return creds
class CdSigningData:
bucket_name: str
notarizing_secret_id: str
aws_account_id: str
signing_role_name: str
def __init__(
self,
bucket_name: str,
notarizing_secret_id: str,
aws_account_id: str,
signing_role_name: str,
):
self.bucket_name = bucket_name
self.notarizing_secret_id = notarizing_secret_id
self.aws_account_id = aws_account_id
self.signing_role_name = signing_role_name
def cd_signer_request(method: str, path: str, data: str | None = None):
SigV4Auth = import_module("botocore.auth").SigV4Auth
AWSRequest = import_module("botocore.awsrequest").AWSRequest
requests = import_module("requests")
url = f"{SIGNING_API_BASE_URL}{path}"
headers = {"Content-Type": "application/json"}
request = AWSRequest(method=method, url=url, data=data, headers=headers)
SigV4Auth(get_creds(), "signer-builder-tools", REGION).add_auth(request)
for i in range(1, 8):
response = requests.request(method=method, url=url, headers=dict(request.headers), data=data)
info(f"CDSigner Request ({url}): {response.status_code}")
if response.status_code == 429:
warn(f"Too many requests, backing off for {2 ** i} seconds")
time.sleep(2**i)
continue
return response
raise Exception(f"Failed to request {url}")
def cd_signer_create_request(manifest: Any) -> str:
response = cd_signer_request(
method="POST",
path="/signing_requests",
data=json.dumps({"manifest": manifest}),
)
response_json = response.json()
info(f"Signing request create: {response_json}")
request_id = response_json["signingRequestId"]
return request_id
def cd_signer_start_request(request_id: str, source_key: str, destination_key: str, signing_data: CdSigningData):
response_text = cd_signer_request(
method="POST",
path=f"/signing_requests/{request_id}/start",
data=json.dumps(
{
"iamRole": f"arn:aws:iam::{signing_data.aws_account_id}:role/{signing_data.signing_role_name}",
"s3Location": {
"bucket": signing_data.bucket_name,
"sourceKey": source_key,
"destinationKey": destination_key,
},
}
),
).text
info(f"Signing request start: {response_text}")
def cd_signer_status_request(request_id: str):
response_json = cd_signer_request(
method="GET",
path=f"/signing_requests/{request_id}",
).json()
info(f"Signing request status: {response_json}")
return response_json["signingRequest"]["status"]
def cd_build_signed_package(type: CdSigningType, file_path: pathlib.Path, name: str):
working_dir = pathlib.Path(f"build-config/signing/{type.value}")
starting_dir = pathlib.Path.cwd()
if type == CdSigningType.DMG:
# Our dmg file names vary by platform, so this is templated in the manifest
manifest_template_path = working_dir / "manifest.yaml.template"
manifest_path = working_dir / "manifest.yaml"
manifest_path.write_text(manifest_template_path.read_text().replace("__NAME__", name))
if file_path.is_dir():
shutil.copytree(file_path, working_dir / "artifact" / file_path.name)
shutil.rmtree(file_path)
elif file_path.is_file():
shutil.copy2(file_path, working_dir / "artifact" / file_path.name)
file_path.unlink()
else:
raise Exception(f"Unknown file type: {file_path}")
run_cmd(["gtar", "-czf", working_dir / "artifact.gz", "-C", working_dir / "artifact", "."])
run_cmd(
["gtar", "-czf", starting_dir / "package.tar.gz", "manifest.yaml", "artifact.gz"],
cwd=working_dir,
)
(working_dir / "artifact.gz").unlink()
shutil.rmtree(working_dir / "artifact")
# Sign a file with CDSigner
def cd_sign_file(file: pathlib.Path, type: CdSigningType, signing_data: CdSigningData, is_prod: bool):
name = file.name
info(f"Signing {name}")
# CDSigner requires us to build up a tar file in an extremely specific format
info("Packaging...")
cd_build_signed_package(type, file, name)
# Upload package for signing to S3
info("Uploading...")
run_cmd(["aws", "s3", "rm", "--recursive", f"s3://{signing_data.bucket_name}/signed"])
run_cmd(["aws", "s3", "rm", "--recursive", f"s3://{signing_data.bucket_name}/pre-signed"])
run_cmd(["aws", "s3", "cp", "package.tar.gz", f"s3://{signing_data.bucket_name}/pre-signed/package.tar.gz"])
pathlib.Path("package.tar.gz").unlink()
info("Sending request...")
match type:
case CdSigningType.APP:
manifest = app_manifest()
case CdSigningType.DMG:
manifest = dmg_manifest(name)
case CdSigningType.IME:
manifest = ime_manifest()
request_id = cd_signer_create_request(manifest)
cd_signer_start_request(
request_id=request_id,
source_key="pre-signed/package.tar.gz",
destination_key="signed/signed.zip",
signing_data=signing_data,
)
max_duration = 180
end_time = time.time() + max_duration
i = 1
while True:
info(f"Checking for signed package {i}")
status = cd_signer_status_request(request_id)
match status:
case "success":
break
case "created" | "processing" | "inProgress":
pass
case "failure":
raise RuntimeError("Signing request failed")
case _:
warn(f"Unexpected status, ignoring: {status}")
if time.time() >= end_time:
raise RuntimeError("Signed package did not appear, check signer logs")
time.sleep(2)
i += 1
info("Signed!")
info("Downloading...")
run_cmd(["aws", "s3", "cp", f"s3://{signing_data.bucket_name}/signed/signed.zip", "signed.zip"])
run_cmd(["unzip", "signed.zip"])
# find child of Payload
children = list(pathlib.Path("Payload").iterdir())
if len(children) != 1:
raise RuntimeError("Payload directory should have exactly one child")
child_path = children[0]
# copy child to the original file location
if child_path.is_dir():
shutil.copytree(child_path, file)
elif child_path.is_file():
shutil.copy2(child_path, file)
else:
raise Exception(f"Unknown file type: {child_path}")
# clean up
pathlib.Path("signed.zip").unlink()
shutil.rmtree("Payload")
info(f"Signing status of {file}")
run_cmd(["codesign", "-dv", "--deep", "--strict", file])
def rebundle_dmg(dmg_path: pathlib.Path, app_path: pathlib.Path):
mounting_path = pathlib.Path("/Volumes") / dmg_path.name.replace(".dmg", "")
info(f"Rebunding {dmg_path}")
# Try to unmount a dmg if it is already there
if mounting_path.is_dir():
run_cmd(["hdiutil", "detach", mounting_path])
tempdmg_path = pathlib.Path.home() / "temp.dmg"
tempdmg_path.unlink(missing_ok=True)
# Convert the dmg to writable
run_cmd(["hdiutil", "convert", dmg_path, "-format", "UDRW", "-o", tempdmg_path])
# Mount the dmg
run_cmd(["hdiutil", "attach", tempdmg_path])
# Copy in the new app
run_cmd(["cp", "-R", app_path, mounting_path])
# Unmount the dmg
run_cmd(["hdiutil", "detach", mounting_path])
# Convert the dmg to zipped, read only - this is the only type that EC will accept!!
dmg_path.unlink()
run_cmd(["hdiutil", "convert", tempdmg_path, "-format", "UDZO", "-o", dmg_path])
def apple_notarize_file(file: pathlib.Path, signing_data: CdSigningData):
name = file.name
file_type = file.suffix[1:]
file_to_notarize = file
if file_type == "app":
# check the app is ready to be notarized
# TODO(grant): remove the check=False if this works
run_cmd(["syspolicy_check", "notary-submission", file], check=False)
# We can submit dmg files as is, but we have to zip up app files in a specific way
file_to_notarize = pathlib.Path(f"{name}.zip")
run_cmd(["ditto", "-c", "-k", "--sequesterRsrc", "--keepParent", file, file_to_notarize])
secrets = get_secretmanager_json(signing_data.notarizing_secret_id)
run_cmd(
[
"xcrun",
"notarytool",
"submit",
file_to_notarize,
"--team-id",
APPLE_TEAM_ID,
"--apple-id",
secrets["appleId"],
"--password",
secrets["appleIdPassword"],
"--wait",
]
)
run_cmd(["xcrun", "stapler", "staple", file])
if file_type == "app":
# Verify notarization for .app
run_cmd(["spctl", "-a", "-v", file])
pathlib.Path(file_to_notarize).unlink()
# check the file is ready to be distributed
# TODO(grant): remove the check=False if this works
run_cmd(["syspolicy_check", "distribution", file], check=False)
else:
# Verify notarization for .dmg
run_cmd(["spctl", "-a", "-t", "open", "--context", "context:primary-signature", "-v", file])
def get_secretmanager_json(secret_id: str):
info(f"Loading secretmanager value: {secret_id}")
secret_value = run_cmd_output(["aws", "secretsmanager", "get-secret-value", "--secret-id", secret_id])
secret_string = json.loads(secret_value)["SecretString"]
return json.loads(secret_string)
class GpgSigner:
def __init__(self, gpg_id: str, gpg_secret_key: str, gpg_passphrase: str):
self.gpg_id = gpg_id
self.gpg_secret_key = gpg_secret_key
self.gpg_passphrase = gpg_passphrase
self.gpg_home = pathlib.Path.home() / ".gnupg-tmp"
self.gpg_home.mkdir(parents=True, exist_ok=True, mode=0o700)
# write gpg secret key to file
self.gpg_secret_key_path = self.gpg_home / "gpg_secret"
self.gpg_secret_key_path.write_bytes(base64.b64decode(gpg_secret_key))
self.gpg_passphrase_path = self.gpg_home / "gpg_pass"
self.gpg_passphrase_path.write_text(gpg_passphrase)
run_cmd(["gpg", "--version"])
info("Importing GPG key")
run_cmd(["gpg", "--list-keys"], env=self.gpg_env())
run_cmd(
["gpg", *self.sign_args(), "--allow-secret-key-import", "--import", self.gpg_secret_key_path],
env=self.gpg_env(),
)
run_cmd(["gpg", "--list-keys"], env=self.gpg_env())
def gpg_env(self) -> Env:
return {**os.environ, "GNUPGHOME": self.gpg_home}
def sign_args(self) -> Args:
return [
"--batch",
"--pinentry-mode",
"loopback",
"--no-tty",
"--yes",
"--passphrase-file",
self.gpg_passphrase_path,
]
def sign_file(self, path: pathlib.Path) -> List[pathlib.Path]:
info(f"Signing {path.name}")
run_cmd(
["gpg", "--detach-sign", *self.sign_args(), "--local-user", self.gpg_id, path],
env=self.gpg_env(),
)
run_cmd(
["gpg", "--detach-sign", *self.sign_args(), "--armor", "--local-user", self.gpg_id, path],
env=self.gpg_env(),
)
return [path.with_suffix(f"{path.suffix}.asc"), path.with_suffix(f"{path.suffix}.sig")]
def clean(self):
info("Cleaning gpg keys")
shutil.rmtree(self.gpg_home, ignore_errors=True)
def load_gpg_signer() -> Optional[GpgSigner]:
if gpg_id := os.getenv("TEST_PGP_ID"):
gpg_secret_key = os.getenv("TEST_PGP_SECRET_KEY")
gpg_passphrase = os.getenv("TEST_PGP_PASSPHRASE")
if gpg_secret_key is not None and gpg_passphrase is not None:
info("Using test pgp key", gpg_id)
return GpgSigner(gpg_id=gpg_id, gpg_secret_key=gpg_secret_key, gpg_passphrase=gpg_passphrase)
pgp_secret_arn = os.getenv("FIG_IO_DESKTOP_PGP_KEY_ARN")
info(f"FIG_IO_DESKTOP_PGP_KEY_ARN: {pgp_secret_arn}")
if pgp_secret_arn:
gpg_secret_json = get_secretmanager_json(pgp_secret_arn)
gpg_id = gpg_secret_json["gpg_id"]
gpg_secret_key = gpg_secret_json["gpg_secret_key"]
gpg_passphrase = gpg_secret_json["gpg_passphrase"]
return GpgSigner(gpg_id=gpg_id, gpg_secret_key=gpg_secret_key, gpg_passphrase=gpg_passphrase)
else:
return None