in composer_local_dev/environment.py [0:0]
def assert_image_exists(image_version: str):
"""Asserts that image version exists.
Raises if the image does not exist.
Warns if the API error occurs, or we cannot access to Artifact Registry.
Args:
image_version: Image version in format of 'composer-x.y.z-airflow-a.b.c'
"""
airflow_v, composer_v = utils.get_airflow_composer_versions(image_version)
image_tag = utils.get_image_version_tag(airflow_v, composer_v)
LOG.info("Asserting that %s composer image version exists", image_tag)
image_url = constants.ARTIFACT_REGISTRY_IMAGE_URL.format(
airflow_v=airflow_v, composer_v=composer_v
)
client = artifactregistry_v1.ArtifactRegistryClient()
request = artifactregistry_v1.GetTagRequest(name=image_url)
LOG.debug(f"GetTagRequest for %s: %s", image_tag, str(request))
try:
client.get_tag(request=request)
except api_exception.NotFound:
raise errors.ImageNotFoundError(image_version=image_tag) from None
except api_exception.PermissionDenied:
warnings.warn(
constants.IMAGE_TAG_PERMISSION_DENIED_WARN.format(
image_tag=image_tag
)
)
except (
auth_exception.GoogleAuthError,
api_exception.GoogleAPIError,
) as err:
raise errors.InvalidAuthError(err)