def handler()

in usecases/mwaa-with-codeartifact/lambda/lambda_handler.py [0:0]


def handler(event, context):
    ca_domain = os.environ.get("CA_DOMAIN")
    ca_domain_owner = os.environ.get("CA_DOMAIN_OWNER")
    ca_repo_name = os.environ.get("CA_REPOSITORY_NAME")
    bucket_name = os.environ.get("BUCKET_NAME")
    region = os.environ.get("AWS_REGION")

    try:
        ca_client = boto3.client("codeartifact")
        res = ca_client.get_authorization_token(
            domain=ca_domain, domainOwner=ca_domain_owner, durationSeconds=43200
        )
        token = res["authorizationToken"]

        s3_client = boto3.client("s3")
        ca_index_url = (
            f"--index-url https://aws:{token}@{ca_domain}-{ca_domain_owner}"
            f".d.codeartifact.{region}.amazonaws.com/pypi/{ca_repo_name}/simple/"
        )
        s3_client.put_object(
            Body=ca_index_url, Bucket=bucket_name, Key="dags/codeartifact.txt"
        )
    except Exception as e:
        return {"statusCode": 500, "event": event, "body": {"Error": str(e)}}

    return {
        "statusCode": 200,
        "event": event,
        "body": {"Success": "CodeArtifact auth token for MWAA has been updated."},
    }