def assert_image_exists()

in composer_local_dev/environment.py [0:0]


def assert_image_exists(image_version: str):
    """Asserts that image version exists.

    Raises if the image does not exist.
    Warns if the API error occurs, or we cannot access to Artifact Registry.
    Args:
        image_version: Image version in format of 'composer-x.y.z-airflow-a.b.c'
    """
    airflow_v, composer_v = utils.get_airflow_composer_versions(image_version)
    image_tag = utils.get_image_version_tag(airflow_v, composer_v)
    LOG.info("Asserting that %s composer image version exists", image_tag)
    image_url = constants.ARTIFACT_REGISTRY_IMAGE_URL.format(
        airflow_v=airflow_v, composer_v=composer_v
    )
    client = artifactregistry_v1.ArtifactRegistryClient()
    request = artifactregistry_v1.GetTagRequest(name=image_url)
    LOG.debug(f"GetTagRequest for %s: %s", image_tag, str(request))
    try:
        client.get_tag(request=request)
    except api_exception.NotFound:
        raise errors.ImageNotFoundError(image_version=image_tag) from None
    except api_exception.PermissionDenied:
        warnings.warn(
            constants.IMAGE_TAG_PERMISSION_DENIED_WARN.format(
                image_tag=image_tag
            )
        )
    except (
            auth_exception.GoogleAuthError,
            api_exception.GoogleAPIError,
    ) as err:
        raise errors.InvalidAuthError(err)