partnercenter/azext_partnercenter/clients/plan_technicalconfiguration_client.py (210 lines of code) (raw):
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=line-too-long
# pylint: disable=protected-access
# pylint: disable=no-else-return
# pylint: disable=too-many-positional-arguments
import os
from knack.util import CLIError
from azext_partnercenter.clients import OfferClient, PlanClient
from azext_partnercenter.clients._base_client import BaseClient
from azext_partnercenter.models.package_configuration import PackageConfiguration
from azext_partnercenter.models.package_authorization import PackageAuthorization
from azext_partnercenter.models.package_reference import PackageReference
from azext_partnercenter.vendored_sdks.v1.partnercenter.model.microsoft_ingestion_api_models_packages_azure_package import (
MicrosoftIngestionApiModelsPackagesAzurePackage)
from azext_partnercenter.vendored_sdks.v1.partnercenter.model.products_product_id_packageconfigurations_package_configuration_id_get200_response import (
ProductsProductIDPackageconfigurationsPackageConfigurationIDGet200Response
)
from azext_partnercenter.vendored_sdks.production_ingestion.models import (ContainerCnabPlanTechnicalConfigurationProperties)
from ._util import get_combined_paged_results, upload_media
class PlanTechnicalConfigurationClient(BaseClient):
PACKAGE_MODULE = "Package"
def __init__(self, cli_ctx, *_):
super().__init__(cli_ctx, *_)
self._offer_client = OfferClient(cli_ctx, *_)
self._plan_client = PlanClient(cli_ctx, *_)
def get(self, offer_external_id, plan_external_id):
variant_package_branch = self._get_variant_package_branch(offer_external_id, plan_external_id)
if variant_package_branch is None:
raise CLIError(f'Technical Configuration not found for Plan ID "{plan_external_id}"')
# if the resource type is AzureContainer, use alternative API
offer_durable_id = variant_package_branch.product.id
plan_durable_id = variant_package_branch.variant_id
sell_through_microsoft = self._get_sell_through_microsoft(offer_durable_id)
technical_configuration = None
if variant_package_branch.product.resource_type == 'AzureContainer':
technical_configuration = self._graph_api_client.get_container_plan_technical_configuration(offer_durable_id, plan_durable_id, sell_through_microsoft)
elif variant_package_branch.product.resource_type == 'AzureApplication':
technical_configuration = self._get_azure_application_plan_technical_configuration(offer_durable_id, variant_package_branch.current_draft_instance_id)
else:
technical_configuration = self._get_plan_technical_configuration(variant_package_branch.product.id, variant_package_branch.variant_id)
technical_configuration['planId'] = plan_external_id
return technical_configuration
def delete_cnab_reference(self, offer_external_id, plan_external_id, repository_name, tag):
technical_configuration = self.get(offer_external_id, plan_external_id)
cnab_reference_index = -1
for index, ref in enumerate(technical_configuration.cnab_references):
# match repo and tab name for now. if more is required, we can add them here
if ref['repositoryName'] == repository_name and ref['tag'] == tag:
cnab_reference_index = index
break
if cnab_reference_index != -1:
del technical_configuration.cnab_references[cnab_reference_index]
result = self._update_technical_configuration_properties(offer_external_id, plan_external_id, technical_configuration)
return result
def add_bundle(self, offer_external_id, plan_external_id, properties=ContainerCnabPlanTechnicalConfigurationProperties | None):
result = self._update_technical_configuration_properties(offer_external_id, plan_external_id, properties)
return result
def add_managed_app_bundle(self, offer_external_id, plan_external_id, package_path, public_azure_tenant_id, public_azure_authorization_principal, public_azure_authorization_role):
variant_package_branch = self._get_variant_package_branch(offer_external_id, plan_external_id)
offer_durable_id = variant_package_branch.product.id
current_draft_instance_id = variant_package_branch.current_draft_instance_id
file_name = os.path.basename(package_path)
input_package = MicrosoftIngestionApiModelsPackagesAzurePackage(
resource_type='AzureApplicationPackage',
file_name=file_name
)
output_package = self._sdk.package_client.products_product_id_packages_post(
offer_durable_id,
self._get_access_token(),
microsoft_ingestion_api_models_packages_azure_package=input_package)
upload_result = upload_media(package_path, output_package.file_sas_uri)
if upload_result is None:
raise CLIError(f'There was an error uploading "{package_path}"')
output_package.state = "Uploaded"
updated_package = self._sdk.package_client.products_product_id_packages_package_id_put(
offer_durable_id,
output_package.id,
self._get_access_token(),
microsoft_ingestion_api_models_packages_azure_package=output_package
)
package_id = updated_package.id
# wait for the package to be processed
# get package configuration by draft instance id
package_configuration = self._sdk.package_configuration_client.products_product_id_package_configurations_get_by_instance_id_instance_i_dinstance_id_get(
offer_durable_id,
current_draft_instance_id,
self._get_access_token()
)
package_config_data = package_configuration['value'][0]
# Create a dictionary with the relevant data
package_config_dict = {
'resource_type': package_config_data['resourceType'],
'id': package_config_data['id'],
'odata_etag': package_config_data['@odata.etag'],
'allow_jit_access': package_config_data['allowJitAccess'],
'can_enable_customer_actions': True,
'allowed_customer_actions': [
"Microsoft.Resources/*"
],
'azure_government_authorizations': package_config_data['azureGovernmentAuthorizations'],
'package_references': [
{
"type": "AzureApplicationPackage",
"value": package_id
}
],
'publisher_management_mode': package_config_data['publisherManagementMode'],
'customer_access_enable_state': package_config_data['customerAccessEnableState'],
'DeploymentMode': 'Incremental',
'public_azure_tenant_id': public_azure_tenant_id,
'version': "1.0.0",
'public_azure_authorizations': [
{
"principalID": public_azure_authorization_principal,
"roleDefinitionID": public_azure_authorization_role
}
]
}
updated_package_config = ProductsProductIDPackageconfigurationsPackageConfigurationIDGet200Response(**package_config_dict)
package_config = package_configuration['value'][0]
package_configuration_id = package_config['id']
package_config_update = self._sdk.package_configuration_client.products_product_id_packageconfigurations_package_configuration_id_put(
offer_durable_id,
package_configuration_id,
self._get_access_token(),
products_product_id_packageconfigurations_package_configuration_id_get200_response=updated_package_config
)
mapped_config = self._map_package_configuration(package_config_update)
return mapped_config
def _map_package_configuration(self, pkg_config):
package_references = list(map(self._map_package_reference, pkg_config.package_references))
public_azure_authorizations = list(map(self._map_package_authorization, pkg_config.public_azure_authorizations))
azure_government_authorizations = list(map(self._map_package_authorization, pkg_config.azure_government_authorizations))
allowed_customer_actions = pkg_config.allowed_customer_actions
mapped_package_configuration = PackageConfiguration(
id=pkg_config.id,
allowed_customer_actions=allowed_customer_actions,
azure_government_authorizations=azure_government_authorizations,
can_enable_customer_actions=pkg_config.can_enable_customer_actions,
customerAccessEnableState=pkg_config.customerAccessEnableState,
deploymentMode=pkg_config.deploymentMode,
odata_etag=pkg_config.odata_etag,
package_references=package_references,
public_azure_authorizations=public_azure_authorizations,
public_azure_tenant_id=pkg_config.public_azure_tenant_id,
publisherManagementMode=pkg_config.publisherManagementMode,
resource_type=pkg_config.resource_type,
version=pkg_config.version,
_resource=None)
return mapped_package_configuration
def _map_package_reference(self, package_ref):
return PackageReference(type=package_ref.type, value=package_ref.value)
def _map_package_authorization(self, pkg_auth):
return PackageAuthorization(principal_id=pkg_auth.principal_id, role_definition_id=pkg_auth.role_definition_id)
def _update_technical_configuration_properties(self, offer_external_id, plan_external_id, properties=ContainerCnabPlanTechnicalConfigurationProperties | None):
variant_package_branch = self._get_variant_package_branch(offer_external_id, plan_external_id)
offer_durable_id = variant_package_branch.product.id
plan_durable_id = variant_package_branch.variant_id
# here is where we receive the fragmented properties arg, update
status = self._graph_api_client.update_container_plan_technical_configuration_for_bundle(
offer_durable_id, plan_durable_id, properties)
if len(status.errors) > 0:
return {
'errors': status.errors,
'result': status.job_result
}
else:
return {
'message': status.job_status.value,
'result': status.job_result
}
def _get_sell_through_microsoft(self, offer_durable_id):
sell_through_microsoft_enum_value = 'ListAndSell'
setup = self._sdk.product_client.products_product_id_setup_get(offer_durable_id, self._get_access_token())
return setup.selling_option == sell_through_microsoft_enum_value
def _get_variant_package_branch(self, offer_external_id, plan_external_id):
product = self._offer_client._get_sdk_product_by_external_offer_id(offer_external_id)
if product is None:
# TODO: determine if throwing CLIError is better here
return None
variant = self._get_active_azure_sku_variant_by_external_id(product.id, plan_external_id)
if variant is None:
raise CLIError(f'Plan not found for "{plan_external_id}"')
branches = self._sdk.branches_client.products_product_id_branches_get_by_module_modulemodule_get(
product.id, self.PACKAGE_MODULE, self._get_access_token())
if len(branches.value) == 0:
return None
variant_package_branch = next((b for b in branches.value if hasattr(b, 'variant_id') and b.variant_id == variant['id']), None)
if variant_package_branch is not None:
setattr(variant_package_branch, 'product', product)
return variant_package_branch
def _get_active_azure_sku_variant_by_external_id(self, offer_durable_id, plan_external_id):
resource_type = 'AzureSkuVariant'
variants = get_combined_paged_results(lambda: self._sdk.variant_client.products_product_id_variants_get(offer_durable_id, self._get_access_token()))
for v in variants:
if v['resourceType'] == resource_type and v['state'] == 'Active':
if v['externalID'] == plan_external_id:
return v
return None
def _get_azure_application_plan_technical_configuration(self, offer_durable_id, package_configuration_id):
package_configuration = self._sdk.package_configuration_client.products_product_id_packageconfigurations_package_configuration_id_get(
offer_durable_id,
package_configuration_id,
self._get_access_token())
return package_configuration
def _get_plan_technical_configuration(self, offer_durable_id, plan_durable_id):
"""Since we don't know what type of technical plan this will be for now unless we map the types to the schema, this gets any technical configuration type"""
resources = self._get_resource_tree(offer_durable_id)
technical_configuration = None
for r in resources:
if 'plan-technical-configuration' in r['id'] and plan_durable_id in r['plan']:
technical_configuration = r
del technical_configuration['$schema']
del technical_configuration['id']
del technical_configuration['product']
del technical_configuration['plan']
return technical_configuration
def _get_resource_tree(self, offer_durable_id):
import json
response = self._graph_api_client.get_resource_tree(offer_durable_id)
# show the json
formatted_json = json.dumps(response, indent=4)
print(formatted_json)
return response['resources']