in composer_local_dev/environment.py [0:0]
def get_image_version(env):
"""
Return environment image version.
If the environment is running, get image version from the container tag.
Otherwise, get image version from the configuration.
"""
try:
container = env.get_container(env.container_name)
except errors.EnvironmentNotRunningError:
logging.debug(
constants.IMAGE_VERSION_CONTAINER_MISSING.format(env_name=env.name)
)
return env.image_version
if not container.image.tags:
LOG.warning(
constants.IMAGE_VERSION_TAG_MISSING.format(env_name=env.name)
)
return env.image_version
tag = container.image.tags[0]
image_tag = tag.split(":")[-1]
airflow_v, composer_v = utils.get_airflow_composer_versions(image_tag)
airflow_v = utils.format_airflow_version_dotted(airflow_v)
return utils.get_image_version_tag(airflow_v, composer_v)