in azext_edge/edge/providers/orchestration/clone.py [0:0]
def _analyze_instance(self):
api_version = self.version_guru.get_instance_api()
custom_location = deepcopy(self.custom_location)
custom_location["properties"]["hostResourceId"] = TEMPLATE_EXPRESSION_MAP["clusterId"]
custom_location["properties"]["namespace"] = TEMPLATE_EXPRESSION_MAP["clusterNamespace"]
custom_location["name"] = TEMPLATE_EXPRESSION_MAP["customLocationName"]
cl_extension_ids = []
cl_monikers = [
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_PLATFORM],
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_SSC],
EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS],
]
for moniker in cl_monikers:
ext_resource = self.rcontainer_map.get(moniker)
if not ext_resource:
continue
if moniker == EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS]:
cl_extension_ids.append(
TEMPLATE_EXPRESSION_MAP["extensionId"].format("', parameters('opsExtensionName')")
)
else:
cl_extension_ids.append(
TEMPLATE_EXPRESSION_MAP["extensionId"].format(f"{ext_resource.resource_state['name']}'")
)
custom_location["properties"]["clusterExtensionIds"] = cl_extension_ids
custom_location["properties"]["displayName"] = TEMPLATE_EXPRESSION_MAP["customLocationName"]
# Custom location needs to be treated as a root resource.
self._add_resource(
key=StateResourceKey.CL,
api_version=CUSTOM_LOCATIONS_API_VERSION,
data=custom_location,
config={"apply_nested_name": False},
depends_on=cl_monikers,
)
self._add_resource(
key=StateResourceKey.INSTANCE,
api_version=api_version,
data=deepcopy(self.instance_record),
depends_on=StateResourceKey.CL,
)
nested_params = {
**build_parameter(name=TemplateParams.CLUSTER_NAME.value),
**build_parameter(name=TemplateParams.INSTANCE_NAME.value),
**build_parameter(
name=TemplateParams.PRINCIPAL_ID.value,
value="[reference('iotOperations', '2023-05-01', 'Full').identity.principalId]",
),
**build_parameter(
name=TemplateParams.SCHEMA_REGISTRY_ID.value,
value=self.instance_record["properties"]["schemaRegistryRef"]["resourceId"],
),
}
parsed_sr_id = parse_resource_id(self.instance_record["properties"]["schemaRegistryRef"]["resourceId"])
# Providing resource_group means a separate deployment to that resource group.
self._add_deployment(
key=StateResourceKey.ROLE_ASSIGNMENT,
api_version="2022-04-01",
data_iter=[get_role_assignment()],
depends_on=EXTENSION_TYPE_TO_MONIKER_MAP[EXTENSION_TYPE_OPS],
parameters=nested_params,
resource_group=parsed_sr_id["resource_group"],
subscription=parsed_sr_id["subscription"],
)