in azext_edge/edge/util/version_check.py [0:0]
def upgrade_available(self, force_refresh: Optional[bool] = False) -> Optional[str]:
try:
# Import here for exception safety
from packaging import version
from azure.cli.core._session import Session
self.iot_ops_session = Session(encoding="utf8")
self.iot_ops_session.load(os.path.join(self.config_dir, SESSION_FILE_NAME))
latest_cli_version = CURRENT_CLI_VERSION
last_fetched = self.iot_ops_session.get(SESSION_KEY_LAST_FETCHED)
if last_fetched:
last_fetched = datetime.strptime(last_fetched, "%Y-%m-%d %H:%M:%S.%f")
latest_cli_version = self.iot_ops_session.get(SESSION_KEY_LATEST_VERSION) or CURRENT_CLI_VERSION
if (
not last_fetched
or force_refresh
or datetime.now() > last_fetched + timedelta(days=FETCH_LATEST_AFTER_DAYS)
):
# Record attempted last fetch
self.iot_ops_session[SESSION_KEY_LAST_FETCHED] = str(datetime.now())
# Set format version though only v1 is supported now
self.iot_ops_session[SESSION_KEY_FORMAT_VERSION] = FORMAT_VERSION_V1_VALUE
if check_connectivity(url=GH_CLI_CONSTANTS_ENDPOINT, max_retries=0):
_just_fetched_gh_version = get_latest_from_github(url=GH_CLI_CONSTANTS_ENDPOINT, timeout=10)
if _just_fetched_gh_version:
latest_cli_version = _just_fetched_gh_version
self.iot_ops_session[SESSION_KEY_LATEST_VERSION] = latest_cli_version
is_upgrade = version.parse(latest_cli_version) > version.parse(CURRENT_CLI_VERSION)
if is_upgrade:
return latest_cli_version
# If anything goes wrong CLI should not crash
except Exception as ae:
logger.debug(ae)