in usecases/mwaa-with-codeartifact/lambda/lambda_handler.py [0:0]
def handler(event, context):
ca_domain = os.environ.get("CA_DOMAIN")
ca_domain_owner = os.environ.get("CA_DOMAIN_OWNER")
ca_repo_name = os.environ.get("CA_REPOSITORY_NAME")
bucket_name = os.environ.get("BUCKET_NAME")
region = os.environ.get("AWS_REGION")
try:
ca_client = boto3.client("codeartifact")
res = ca_client.get_authorization_token(
domain=ca_domain, domainOwner=ca_domain_owner, durationSeconds=43200
)
token = res["authorizationToken"]
s3_client = boto3.client("s3")
ca_index_url = (
f"--index-url https://aws:{token}@{ca_domain}-{ca_domain_owner}"
f".d.codeartifact.{region}.amazonaws.com/pypi/{ca_repo_name}/simple/"
)
s3_client.put_object(
Body=ca_index_url, Bucket=bucket_name, Key="dags/codeartifact.txt"
)
except Exception as e:
return {"statusCode": 500, "event": event, "body": {"Error": str(e)}}
return {
"statusCode": 200,
"event": event,
"body": {"Success": "CodeArtifact auth token for MWAA has been updated."},
}