azext_edge/edge/commands_edge.py (375 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
import json
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Union
from azure.cli.core.azclierror import ArgumentUsageError
from knack.log import get_logger
from .providers.base import DEFAULT_NAMESPACE, load_config_context
from .providers.check.common import ResourceOutputDetailLevel
from .providers.edge_api import META_API_V1
from .providers.orchestration.common import (
IdentityUsageType,
KubernetesDistroType,
MqMemoryProfile,
MqServiceType,
)
from .providers.orchestration.resources import Instances
from .providers.support.base import get_bundle_path
logger = get_logger(__name__)
def support_bundle(
cmd,
log_age_seconds: int = 60 * 60 * 24,
bundle_dir: Optional[str] = None,
include_mq_traces: Optional[bool] = None,
context_name: Optional[str] = None,
ops_services: Optional[List[str]] = None,
) -> Union[Dict[str, Any], None]:
load_config_context(context_name=context_name)
from .providers.support_bundle import build_bundle
bundle_path: PurePath = get_bundle_path(bundle_dir=bundle_dir)
return build_bundle(
ops_services=ops_services,
bundle_path=str(bundle_path),
log_age_seconds=log_age_seconds,
include_mq_traces=include_mq_traces,
)
def check(
cmd,
detail_level: int = ResourceOutputDetailLevel.summary.value,
pre_deployment_checks: Optional[bool] = None,
post_deployment_checks: Optional[bool] = None,
as_object=None,
context_name=None,
ops_service: Optional[str] = None,
resource_kinds: List[str] = None,
resource_name: str = None,
) -> Union[Dict[str, Any], None]:
load_config_context(context_name=context_name)
from .providers.checks import run_checks
aio_deployed = META_API_V1.is_deployed()
# by default - run prechecks if AIO is not deployed, otherwise use argument
run_pre = not aio_deployed if pre_deployment_checks is None else pre_deployment_checks
# by default - run postchecks if AIO is deployed, otherwise use argument
run_post = aio_deployed if post_deployment_checks is None else post_deployment_checks
# only one of pre or post is explicity set to True
if pre_deployment_checks and not post_deployment_checks:
run_post = False
if post_deployment_checks and not pre_deployment_checks:
run_pre = False
# error if resource_name provided without ops_service
if resource_name and not ops_service:
raise ArgumentUsageError(
"Resource name filtering (--resource-name) can only be used with service name (--svc)."
)
if resource_kinds and not ops_service:
raise ArgumentUsageError("Service name (--svc) is required to specify individual resource kind checks.")
if detail_level != ResourceOutputDetailLevel.summary.value and not ops_service:
logger.warning("Detail level (--detail-level) will only affect individual service checks with '--svc'")
return run_checks(
ops_service=ops_service,
detail_level=detail_level,
as_list=not as_object,
resource_name=resource_name,
pre_deployment=run_pre,
post_deployment=run_post,
resource_kinds=resource_kinds,
)
def init(
cmd,
cluster_name: str,
resource_group_name: str,
enable_fault_tolerance: Optional[bool] = None,
no_progress: Optional[bool] = None,
ensure_latest: Optional[bool] = None,
user_trust: Optional[bool] = None,
acs_config: Optional[List[str]] = None,
acs_version: Optional[str] = None,
acs_train: Optional[str] = None,
ssc_config: Optional[List[str]] = None,
ssc_version: Optional[str] = None,
ssc_train: Optional[str] = None,
**kwargs,
) -> Union[Dict[str, Any], None]:
from .common import INIT_NO_PREFLIGHT_ENV_KEY
from .providers.orchestration.work import WorkManager
from .util import (
is_env_flag_enabled,
)
no_pre_flight = is_env_flag_enabled(INIT_NO_PREFLIGHT_ENV_KEY)
work_manager = WorkManager(cmd)
result_payload = work_manager.execute_ops_init(
show_progress=not no_progress,
pre_flight=not no_pre_flight,
cluster_name=cluster_name,
resource_group_name=resource_group_name,
enable_fault_tolerance=enable_fault_tolerance,
user_trust=user_trust,
acs_config=acs_config,
acs_version=acs_version,
acs_train=acs_train,
ssc_config=ssc_config,
ssc_version=ssc_version,
ssc_train=ssc_train,
**kwargs,
)
if no_progress and result_payload:
# @digimaun - TODO
pass
def create_instance(
cmd,
cluster_name: str,
resource_group_name: str,
instance_name: str,
schema_registry_resource_id: str,
cluster_namespace: str = DEFAULT_NAMESPACE,
location: Optional[str] = None,
custom_location_name: Optional[str] = None,
enable_rsync_rules: Optional[bool] = None,
instance_description: Optional[str] = None,
instance_features: Optional[List[str]] = None,
dataflow_profile_instances: int = 1,
trust_settings: Optional[List[str]] = None,
# Akri
container_runtime_socket: Optional[str] = None,
kubernetes_distro: str = KubernetesDistroType.k8s.value,
# Ops Extension
ops_config: Optional[List[str]] = None,
ops_version: Optional[str] = None,
ops_train: Optional[str] = None,
# Broker
custom_broker_config_file: Optional[str] = None,
broker_memory_profile: str = MqMemoryProfile.medium.value,
broker_service_type: str = MqServiceType.CLUSTERIP.value,
broker_backend_partitions: int = 2,
broker_backend_workers: int = 2,
broker_backend_redundancy_factor: int = 2,
broker_frontend_workers: int = 2,
broker_frontend_replicas: int = 2,
add_insecure_listener: Optional[bool] = None,
tags: Optional[dict] = None,
no_progress: Optional[bool] = None,
confirm_yes: Optional[str] = None,
**kwargs,
) -> Union[Dict[str, Any], None]:
from .common import INIT_NO_PREFLIGHT_ENV_KEY
from .providers.orchestration.work import WorkManager
from .util import (
is_env_flag_enabled,
read_file_content,
should_continue_prompt,
)
no_pre_flight = is_env_flag_enabled(INIT_NO_PREFLIGHT_ENV_KEY)
# TODO - @digimaun
custom_broker_config = None
if custom_broker_config_file:
custom_broker_config = json.loads(read_file_content(file_path=custom_broker_config_file))
if broker_service_type == MqServiceType.LOADBALANCER.value and add_insecure_listener:
raise ArgumentUsageError(
f"--add-insecure-listener cannot be used when --broker-service-type is {MqServiceType.LOADBALANCER.value}."
)
# TODO - @digimaun, should [temp] user confirms be moved to InitTargets?
if broker_backend_redundancy_factor < 2:
logger.warning(
"Deploying Azure IoT Operations with less than 2 broker backend replicas "
"prevents future version upgrades."
)
should_bail = not should_continue_prompt(confirm_yes=confirm_yes, context="Create")
if should_bail:
return
work_manager = WorkManager(cmd)
result_payload = work_manager.execute_ops_init(
show_progress=not no_progress,
pre_flight=not no_pre_flight,
apply_foundation=False,
cluster_name=cluster_name,
resource_group_name=resource_group_name,
cluster_namespace=cluster_namespace,
schema_registry_resource_id=schema_registry_resource_id,
location=location,
custom_location_name=custom_location_name,
enable_rsync_rules=enable_rsync_rules,
instance_name=instance_name,
instance_description=instance_description,
instance_features=instance_features,
add_insecure_listener=add_insecure_listener,
dataflow_profile_instances=dataflow_profile_instances,
trust_settings=trust_settings,
# Ops extension
container_runtime_socket=container_runtime_socket,
kubernetes_distro=kubernetes_distro,
ops_config=ops_config,
ops_version=ops_version,
ops_train=ops_train,
# Broker
custom_broker_config=custom_broker_config,
broker_memory_profile=broker_memory_profile,
broker_service_type=broker_service_type,
broker_backend_partitions=broker_backend_partitions,
broker_backend_workers=broker_backend_workers,
broker_backend_redundancy_factor=broker_backend_redundancy_factor,
broker_frontend_workers=broker_frontend_workers,
broker_frontend_replicas=broker_frontend_replicas,
tags=tags,
**kwargs,
)
if no_progress and result_payload:
# @digimaun - TODO
pass
# The extra-ordinary number of explicit params are due to how Azure CLI handles params/args.
# Potentially this can be simplified by some Knack hacking.
def upgrade_instance(
cmd,
resource_group_name: str,
instance_name: str,
no_progress: Optional[bool] = None,
confirm_yes: Optional[bool] = None,
ops_config: Optional[List[str]] = None,
ops_config_sync_mode: Optional[str] = None,
ops_version: Optional[str] = None,
ops_train: Optional[str] = None,
acs_config: Optional[List[str]] = None,
acs_config_sync_mode: Optional[str] = None,
acs_version: Optional[str] = None,
acs_train: Optional[str] = None,
osm_config: Optional[List[str]] = None,
osm_config_sync_mode: Optional[str] = None,
osm_version: Optional[str] = None,
osm_train: Optional[str] = None,
ssc_config: Optional[List[str]] = None,
ssc_version: Optional[str] = None,
ssc_train: Optional[str] = None,
ssc_config_sync_mode: Optional[str] = None,
plat_config: Optional[List[str]] = None,
plat_version: Optional[str] = None,
plat_train: Optional[str] = None,
plat_config_sync_mode: Optional[str] = None,
force: Optional[bool] = None,
**kwargs,
) -> Optional[List[dict]]:
from .providers.orchestration.upgrade2 import upgrade_ops_instance
return upgrade_ops_instance(
cmd=cmd,
resource_group_name=resource_group_name,
instance_name=instance_name,
no_progress=no_progress,
confirm_yes=confirm_yes,
ops_config=ops_config,
ops_version=ops_version,
ops_train=ops_train,
ops_config_sync_mode=ops_config_sync_mode,
acs_config=acs_config,
acs_version=acs_version,
acs_train=acs_train,
acs_config_sync_mode=acs_config_sync_mode,
osm_config=osm_config,
osm_version=osm_version,
osm_train=osm_train,
osm_config_sync_mode=osm_config_sync_mode,
ssc_config=ssc_config,
ssc_version=ssc_version,
ssc_train=ssc_train,
ssc_config_sync_mode=ssc_config_sync_mode,
plat_config=plat_config,
plat_version=plat_version,
plat_train=plat_train,
plat_config_sync_mode=plat_config_sync_mode,
force=force,
**kwargs,
)
def delete(
cmd,
resource_group_name: str,
instance_name: Optional[str] = None,
cluster_name: Optional[str] = None,
confirm_yes: Optional[bool] = None,
no_progress: Optional[bool] = None,
force: Optional[bool] = None,
include_dependencies: Optional[bool] = None,
):
from .providers.orchestration.deletion import delete_ops_resources
return delete_ops_resources(
cmd=cmd,
instance_name=instance_name,
cluster_name=cluster_name,
resource_group_name=resource_group_name,
confirm_yes=confirm_yes,
no_progress=no_progress,
force=force,
include_dependencies=include_dependencies,
)
def show_instance(cmd, instance_name: str, resource_group_name: str, show_tree: Optional[bool] = None) -> dict:
return Instances(cmd).show(name=instance_name, resource_group_name=resource_group_name, show_tree=show_tree)
def list_instances(cmd, resource_group_name: Optional[str] = None) -> Iterable[dict]:
return Instances(cmd).list(resource_group_name)
def update_instance(
cmd,
instance_name: str,
resource_group_name: str,
tags: Optional[str] = None,
instance_description: Optional[str] = None,
instance_features: Optional[List[str]] = None,
**kwargs,
) -> dict:
return Instances(cmd).update(
name=instance_name,
resource_group_name=resource_group_name,
tags=tags,
description=instance_description,
features=instance_features,
**kwargs,
)
def instance_identity_assign(
cmd,
instance_name: str,
resource_group_name: str,
mi_user_assigned: str,
federated_credential_name: Optional[str] = None,
usage_type: IdentityUsageType = IdentityUsageType.dataflow.value,
use_self_hosted_issuer: Optional[bool] = None,
**kwargs,
) -> dict:
return Instances(cmd).add_mi_user_assigned(
name=instance_name,
resource_group_name=resource_group_name,
mi_user_assigned=mi_user_assigned,
federated_credential_name=federated_credential_name,
use_self_hosted_issuer=use_self_hosted_issuer,
usage_type=usage_type,
**kwargs,
)
def instance_identity_show(cmd, instance_name: str, resource_group_name: str) -> dict:
instance = Instances(cmd).show(
name=instance_name,
resource_group_name=resource_group_name,
)
return instance.get("identity", {})
def instance_identity_remove(
cmd,
instance_name: str,
resource_group_name: str,
mi_user_assigned: str,
federated_credential_name: Optional[str] = None,
**kwargs,
) -> dict:
return Instances(cmd).remove_mi_user_assigned(
name=instance_name,
resource_group_name=resource_group_name,
mi_user_assigned=mi_user_assigned,
federated_credential_name=federated_credential_name,
**kwargs,
)
def clone_instance(
cmd,
instance_name: str,
resource_group_name: str,
summary_mode: Optional[str] = None,
to_dir: Optional[str] = None,
template_mode: Optional[str] = None,
to_instance_name: Optional[str] = None,
to_cluster_id: Optional[str] = None,
use_self_hosted_issuer: Optional[bool] = None,
linked_base_uri: Optional[str] = None,
no_progress: Optional[bool] = None,
confirm_yes: Optional[bool] = None,
force: Optional[bool] = None,
) -> dict:
from .providers.orchestration.clone import clone_instance
return clone_instance(
cmd=cmd,
resource_group_name=resource_group_name,
instance_name=instance_name,
summary_mode=summary_mode,
to_dir=to_dir,
template_mode=template_mode,
to_instance_name=to_instance_name,
to_cluster_id=to_cluster_id,
use_self_hosted_issuer=use_self_hosted_issuer,
linked_base_uri=linked_base_uri,
no_progress=no_progress,
confirm_yes=confirm_yes,
force=force,
)