in azext_edge/edge/providers/orchestration/upgrade2.py [0:0]
def analyze_cluster(self, **override_kwargs: dict) -> "ClusterUpgradeState":
with Progress(
SpinnerColumn("star"),
*Progress.get_default_columns(),
"Elapsed:",
TimeElapsedColumn(),
transient=True,
disable=bool(self.no_progress),
) as progress:
_ = progress.add_task("Analyzing cluster...", total=None)
if not self.resource_map.connected_cluster.connected:
raise ValidationError(f"Cluster {self.resource_map.connected_cluster.cluster_name} is not connected.")
return ClusterUpgradeState(
extensions_map=self.resource_map.connected_cluster.get_extensions_by_type(
*list(EXTENSION_TYPE_TO_MONIKER_MAP.keys())
),
init_version_map={
**self.targets.get_extension_versions(),
**self.targets.get_extension_versions(False),
},
desired_config_map=self.get_desired_config(),
override_map=build_override_map(**override_kwargs),
force=self.force,
)