in src/azure-cli/azure/cli/command_modules/vm/custom.py [0:0]
def create_image_version(cmd, resource_group_name, gallery_name, gallery_image_name, gallery_image_version,
location=None, target_regions=None, storage_account_type=None,
end_of_life_date=None, exclude_from_latest=None, replica_count=None, tags=None,
os_snapshot=None, data_snapshots=None, managed_image=None, data_snapshot_luns=None,
target_region_encryption=None, os_vhd_uri=None, os_vhd_storage_account=None,
data_vhds_uris=None, data_vhds_luns=None, data_vhds_storage_accounts=None,
replication_mode=None, target_region_cvm_encryption=None, virtual_machine=None,
image_version=None, target_zone_encryption=None, target_edge_zones=None,
allow_replicated_location_deletion=None, block_deletion_before_end_of_life=None):
from azure.mgmt.core.tools import resource_id, is_valid_resource_id
from azure.cli.core.commands.client_factory import get_subscription_id
location = location or _get_resource_group_location(cmd.cli_ctx, resource_group_name)
end_of_life_date = fix_gallery_image_date_info(end_of_life_date)
if managed_image and not is_valid_resource_id(managed_image):
managed_image = resource_id(subscription=get_subscription_id(cmd.cli_ctx), resource_group=resource_group_name,
namespace='Microsoft.Compute', type='images', name=managed_image)
if os_snapshot and not is_valid_resource_id(os_snapshot):
os_snapshot = resource_id(subscription=get_subscription_id(cmd.cli_ctx), resource_group=resource_group_name,
namespace='Microsoft.Compute', type='snapshots', name=os_snapshot)
if data_snapshots:
for i, s in enumerate(data_snapshots):
if not is_valid_resource_id(data_snapshots[i]):
data_snapshots[i] = resource_id(
subscription=get_subscription_id(cmd.cli_ctx), resource_group=resource_group_name,
namespace='Microsoft.Compute', type='snapshots', name=s)
profile = {
"exclude_from_latest": exclude_from_latest,
"end_of_life_date": end_of_life_date,
"target_regions": target_regions or [{"name": location}],
"replica_count": replica_count,
"storage_account_type": storage_account_type
}
if target_edge_zones:
profile["target_extended_locations"] = target_edge_zones
if replication_mode is not None:
profile["replication_mode"] = replication_mode
if not cmd.supported_api_version(min_api='2022-03-03', operation_group='gallery_image_versions'):
source = {"managed_image": {"id": managed_image}}
profile["source"] = source
if cmd.supported_api_version(min_api='2019-07-01', operation_group='gallery_image_versions'):
if managed_image is None and os_snapshot is None and os_vhd_uri is None:
raise RequiredArgumentMissingError('usage error: Please provide --managed-image or --os-snapshot or --vhd')
source = os_disk_image = data_disk_images = None
if virtual_machine is not None and cmd.supported_api_version(min_api='2023-07-03',
operation_group='gallery_image_versions'):
source = {"virtual_machine_id": virtual_machine}
elif managed_image is not None:
source = {"id": managed_image}
if os_snapshot is not None:
os_disk_image = {"source": {"id": os_snapshot}}
if data_snapshot_luns and not data_snapshots:
raise ArgumentUsageError('usage error: --data-snapshot-luns must be used together with --data-snapshots')
if data_snapshots:
if data_snapshot_luns and len(data_snapshots) != len(data_snapshot_luns):
raise ArgumentUsageError('usage error: Length of --data-snapshots and '
'--data-snapshot-luns should be equal.')
if not data_snapshot_luns:
data_snapshot_luns = list(range(len(data_snapshots)))
data_disk_images = []
for i, s in enumerate(data_snapshots):
data_disk_images.append({"source": {"id": s}, "lun": int(data_snapshot_luns[i])})
# from vhd, only support os image now
if cmd.supported_api_version(min_api='2020-09-30', operation_group='gallery_image_versions'):
# OS disk
if os_vhd_uri and os_vhd_storage_account is None or os_vhd_uri is None and os_vhd_storage_account:
raise ArgumentUsageError('--os-vhd-uri and --os-vhd-storage-account should be used together.')
if os_vhd_uri and os_vhd_storage_account:
if not is_valid_resource_id(os_vhd_storage_account):
os_vhd_storage_account = resource_id(
subscription=get_subscription_id(cmd.cli_ctx), resource_group=resource_group_name,
namespace='Microsoft.Storage', type='storageAccounts', name=os_vhd_storage_account)
os_disk_image = {
"source": {
"storage_account_id": os_vhd_storage_account,
"uri": os_vhd_uri
}
}
# Data disks
if data_vhds_uris and data_vhds_storage_accounts is None or \
data_vhds_uris is None and data_vhds_storage_accounts:
raise ArgumentUsageError('--data-vhds-uris and --data-vhds-storage-accounts should be used together.')
if data_vhds_luns and data_vhds_uris is None:
raise ArgumentUsageError('--data-vhds-luns must be used together with --data-vhds-uris')
if data_vhds_uris:
# Generate LUNs
if data_vhds_luns is None:
# 0, 1, 2, ...
data_vhds_luns = list(range(len(data_vhds_uris)))
# Check length
len_data_vhds_uris = len(data_vhds_uris)
len_data_vhds_luns = len(data_vhds_luns)
len_data_vhds_storage_accounts = len(data_vhds_storage_accounts)
if len_data_vhds_uris != len_data_vhds_luns or len_data_vhds_uris != len_data_vhds_storage_accounts:
raise ArgumentUsageError(
'Length of --data-vhds-uris, --data-vhds-luns, --data-vhds-storage-accounts must be same.')
# Generate full storage account ID
for i, storage_account in enumerate(data_vhds_storage_accounts):
if not is_valid_resource_id(storage_account):
data_vhds_storage_accounts[i] = resource_id(
subscription=get_subscription_id(cmd.cli_ctx), resource_group=resource_group_name,
namespace='Microsoft.Storage', type='storageAccounts', name=storage_account)
if data_disk_images is None:
data_disk_images = []
for uri, lun, account in zip(data_vhds_uris, data_vhds_luns, data_vhds_storage_accounts):
data_disk_images.append({
"source": {"storage_account_id": account, "uri": uri},
"lun": lun
})
storage_profile = {"source": source, "os_disk_image": os_disk_image, "data_disk_images": data_disk_images}
args = {
"publishing_profile": profile,
"location": location,
"tags": tags or {},
"storage_profile": storage_profile
}
if allow_replicated_location_deletion is not None:
args["safety_profile"] = {
"allow_deletion_of_replicated_locations": allow_replicated_location_deletion
}
if block_deletion_before_end_of_life is not None:
if "safety_profile" not in args:
args["safety_profile"] = {}
args["safety_profile"]["block_deletion_before_end_of_life"] = block_deletion_before_end_of_life
else:
if managed_image is None:
raise RequiredArgumentMissingError('usage error: Please provide --managed-image')
args = {"publishing_profile": profile, "location": location, "tags": tags or {}}
args["resource_group"] = resource_group_name
args["gallery_name"] = gallery_name
args["gallery_image_definition"] = gallery_image_name
args["gallery_image_version_name"] = gallery_image_version
from .aaz.latest.sig.image_version import Create
return Create(cli_ctx=cmd.cli_ctx)(command_args=args)