azurelinuxagent/common/protocol/extensions_goal_state_from_vm_settings.py (314 lines of code) (raw):
# Microsoft Azure Linux Agent
#
# Copyright 2020 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import datetime
import json
import sys
from azurelinuxagent.common import logger
from azurelinuxagent.common.AgentGlobals import AgentGlobals
from azurelinuxagent.common.event import WALAEventOperation, add_event
from azurelinuxagent.common.future import ustr, urlparse
from azurelinuxagent.common.protocol.extensions_goal_state import ExtensionsGoalState, GoalStateChannel, VmSettingsParseError
from azurelinuxagent.common.protocol.restapi import VMAgentFamily, Extension, ExtensionRequestedState, ExtensionSettings
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
class ExtensionsGoalStateFromVmSettings(ExtensionsGoalState):
_MINIMUM_TIMESTAMP = datetime.datetime(1900, 1, 1, 0, 0) # min value accepted by datetime.strftime()
def __init__(self, etag, json_text, correlation_id):
super(ExtensionsGoalStateFromVmSettings, self).__init__()
self._id = "etag_{0}".format(etag)
self._etag = etag
self._svd_sequence_number = 0
self._hostga_plugin_correlation_id = correlation_id
self._text = json_text
self._host_ga_plugin_version = FlexibleVersion('0.0.0.0')
self._schema_version = FlexibleVersion('0.0.0.0')
self._activity_id = AgentGlobals.GUID_ZERO
self._correlation_id = AgentGlobals.GUID_ZERO
self._created_on_timestamp = self._MINIMUM_TIMESTAMP
self._source = None
self._status_upload_blob = None
self._status_upload_blob_type = None
self._required_features = []
self._on_hold = False
self._agent_families = []
self._extensions = []
try:
self._parse_vm_settings(json_text)
self._do_common_validations()
except Exception as e:
message = "Error parsing vmSettings [HGAP: {0} Etag:{1}]: {2}".format(self._host_ga_plugin_version, etag, ustr(e))
raise VmSettingsParseError(message, etag, self.get_redacted_text())
@property
def id(self):
return self._id
@property
def etag(self):
return self._etag
@property
def svd_sequence_number(self):
return self._svd_sequence_number
@property
def host_ga_plugin_version(self):
return self._host_ga_plugin_version
@property
def schema_version(self):
return self._schema_version
@property
def activity_id(self):
"""
The CRP activity id
"""
return self._activity_id
@property
def correlation_id(self):
"""
The correlation id for the CRP operation
"""
return self._correlation_id
@property
def hostga_plugin_correlation_id(self):
"""
The correlation id for the call to the HostGAPlugin vmSettings API
"""
return self._hostga_plugin_correlation_id
@property
def created_on_timestamp(self):
"""
Timestamp assigned by the CRP (time at which the goal state was created)
"""
return self._created_on_timestamp
@property
def channel(self):
return GoalStateChannel.HostGAPlugin
@property
def source(self):
return self._source
@property
def status_upload_blob(self):
return self._status_upload_blob
@property
def status_upload_blob_type(self):
return self._status_upload_blob_type
def _set_status_upload_blob_type(self, value):
self._status_upload_blob_type = value
@property
def required_features(self):
return self._required_features
@property
def on_hold(self):
return self._on_hold
@property
def agent_families(self):
return self._agent_families
@property
def extensions(self):
return self._extensions
def get_redacted_text(self):
try:
text = self._text
if self.status_upload_blob is not None:
parsed = urlparse(self.status_upload_blob)
original = text
text = text.replace(parsed.query, "***REDACTED***")
if text == original:
raise Exception('Could not redact the status upload blob')
for ext_handler in self._extensions:
for extension in ext_handler.settings:
if extension.protectedSettings is not None:
original = text
text = text.replace(extension.protectedSettings, "***REDACTED***")
if text == original:
return 'Could not redact protectedSettings for {0}'.format(extension.name)
return text
except Exception as e:
return "Error redacting text: {0}".format(e)
def _parse_vm_settings(self, json_text):
vm_settings = _CaseFoldedDict.from_dict(json.loads(json_text))
self._parse_simple_attributes(vm_settings)
self._parse_status_upload_blob(vm_settings)
self._parse_required_features(vm_settings)
self._parse_agent_manifests(vm_settings)
self._parse_extensions(vm_settings)
def _parse_simple_attributes(self, vm_settings):
# Sample:
# {
# "hostGAPluginVersion": "1.0.8.115",
# "vmSettingsSchemaVersion": "0.0",
# "activityId": "a33f6f53-43d6-4625-b322-1a39651a00c9",
# "correlationId": "9a47a2a2-e740-4bfc-b11b-4f2f7cfe7d2e",
# "inSvdSeqNo": 1,
# "extensionsLastModifiedTickCount": 637726657706205217,
# "extensionGoalStatesSource": "FastTrack",
# ...
# }
# The HGAP version is included in some messages, so parse it first
host_ga_plugin_version = vm_settings.get("hostGAPluginVersion")
if host_ga_plugin_version is not None:
self._host_ga_plugin_version = FlexibleVersion(host_ga_plugin_version)
self._activity_id = self._string_to_id(vm_settings.get("activityId"))
self._correlation_id = self._string_to_id(vm_settings.get("correlationId"))
self._svd_sequence_number = self._string_to_id(vm_settings.get("inSvdSeqNo"))
self._created_on_timestamp = self._ticks_to_utc_timestamp(vm_settings.get("extensionsLastModifiedTickCount"))
schema_version = vm_settings.get("vmSettingsSchemaVersion")
if schema_version is not None:
self._schema_version = FlexibleVersion(schema_version)
on_hold = vm_settings.get("onHold")
if on_hold is not None:
self._on_hold = on_hold
self._source = vm_settings.get("extensionGoalStatesSource")
if self._source is None:
self._source = "UNKNOWN"
def _parse_status_upload_blob(self, vm_settings):
# Sample:
# {
# ...
# "statusUploadBlob": {
# "statusBlobType": "BlockBlob",
# "value": "https://dcrcl3a0xs.blob.core.windows.net/$system/edp0plkw2b.86f4ae0a-61f8-48ae-9199-40f402d56864.status?sv=2018-03-28&sr=b&sk=system-1&sig=KNWgC2%3d&se=9999-01-01T00%3a00%3a00Z&sp=w"
# },
# ...
# }
status_upload_blob = vm_settings.get("statusUploadBlob")
if status_upload_blob is None:
self._status_upload_blob = None
self._status_upload_blob_type = "BlockBlob"
else:
self._status_upload_blob = status_upload_blob.get("value")
if self._status_upload_blob is None:
raise Exception("Missing statusUploadBlob.value")
self._status_upload_blob_type = status_upload_blob.get("statusBlobType")
if self._status_upload_blob_type is None:
self._status_upload_blob_type = "BlockBlob"
def _parse_required_features(self, vm_settings):
# Sample:
# {
# ...
# "requiredFeatures": [
# {
# "name": "MultipleExtensionsPerHandler"
# }
# ],
# ...
# }
required_features = vm_settings.get("requiredFeatures")
if required_features is not None:
if not isinstance(required_features, list):
raise Exception("requiredFeatures should be an array (got {0})".format(required_features))
def get_required_features_names():
for feature in required_features:
name = feature.get("name")
if name is None:
raise Exception("A required feature is missing the 'name' property (got {0})".format(feature))
yield name
self._required_features.extend(get_required_features_names())
def _parse_agent_manifests(self, vm_settings):
# Sample:
# {
# ...
# "gaFamilies": [
# {
# "name": "Prod",
# "version": "9.9.9.9",
# "isVersionFromRSM": true,
# "isVMEnabledForRSMUpgrades": true,
# "uris": [
# "https://zrdfepirv2cdm03prdstr01a.blob.core.windows.net/7d89d439b79f4452950452399add2c90/Microsoft.OSTCLinuxAgent_Prod_uscentraleuap_manifest.xml",
# "https://ardfepirv2cdm03prdstr01a.blob.core.windows.net/7d89d439b79f4452950452399add2c90/Microsoft.OSTCLinuxAgent_Prod_uscentraleuap_manifest.xml"
# ]
# },
# {
# "name": "Test",
# "uris": [
# "https://zrdfepirv2cdm03prdstr01a.blob.core.windows.net/7d89d439b79f4452950452399add2c90/Microsoft.OSTCLinuxAgent_Test_uscentraleuap_manifest.xml",
# "https://ardfepirv2cdm03prdstr01a.blob.core.windows.net/7d89d439b79f4452950452399add2c90/Microsoft.OSTCLinuxAgent_Test_uscentraleuap_manifest.xml"
# ]
# }
# ],
# ...
# }
families = vm_settings.get("gaFamilies")
if families is None:
return
if not isinstance(families, list):
raise Exception("gaFamilies should be an array (got {0})".format(families))
for family in families:
name = family["name"]
version = family.get("version")
is_version_from_rsm = family.get("isVersionFromRSM")
is_vm_enabled_for_rsm_upgrades = family.get("isVMEnabledForRSMUpgrades")
uris = family.get("uris")
if uris is None:
uris = []
agent_family = VMAgentFamily(name)
agent_family.version = version
agent_family.is_version_from_rsm = is_version_from_rsm
agent_family.is_vm_enabled_for_rsm_upgrades = is_vm_enabled_for_rsm_upgrades
for u in uris:
agent_family.uris.append(u)
self._agent_families.append(agent_family)
def _parse_extensions(self, vm_settings):
# Sample (NOTE: The first sample is single-config, the second multi-config):
# {
# ...
# "extensionGoalStates": [
# {
# "name": "Microsoft.Azure.Monitor.AzureMonitorLinuxAgent",
# "version": "1.9.1",
# "location": "https://zrdfepirv2cbn04prdstr01a.blob.core.windows.net/a47f0806d764480a8d989d009c75007d/Microsoft.Azure.Monitor_AzureMonitorLinuxAgent_useast2euap_manifest.xml",
# "state": "enabled",
# "autoUpgrade": true,
# "runAsStartupTask": false,
# "isJson": true,
# "useExactVersion": true,
# "encodedSignature": "MIIn...",
# "settingsSeqNo": 0,
# "settings": [
# {
# "protectedSettingsCertThumbprint": "BD447EF71C3ADDF7C837E84D630F3FAC22CCD22F",
# "protectedSettings": "MIIBsAYJKoZIhvcNAQcDoIIBoTCCAZ0CAQAxggFpMIIBZQIBADBNMDkxNzA1BgoJkiaJk/IsZAEZFidXaW5kb3dzIEF6dXJlIENSUCBDZXJ0aWZpY2F0ZSBHZW5lcmF0b3ICEFpB/HKM/7evRk+DBz754wUwDQYJKoZIhvcNAQEBBQAEggEADPJwniDeIUXzxNrZCloitFdscQ59Bz1dj9DLBREAiM8jmxM0LLicTJDUv272Qm/4ZQgdqpFYBFjGab/9MX+Ih2x47FkVY1woBkckMaC/QOFv84gbboeQCmJYZC/rZJdh8rCMS+CEPq3uH1PVrvtSdZ9uxnaJ+E4exTPPviIiLIPtqWafNlzdbBt8HZjYaVw+SSe+CGzD2pAQeNttq3Rt/6NjCzrjG8ufKwvRoqnrInMs4x6nnN5/xvobKIBSv4/726usfk8Ug+9Q6Benvfpmre2+1M5PnGTfq78cO3o6mI3cPoBUjp5M0iJjAMGeMt81tyHkimZrEZm6pLa4NQMOEjArBgkqhkiG9w0BBwEwFAYIKoZIhvcNAwcECC5nVaiJaWt+gAhgeYvxUOYHXw==",
# "publicSettings": "{\"GCS_AUTO_CONFIG\":true}"
# }
# ],
# "dependsOn": [
# ...
# ]
# },
# {
# "name": "Microsoft.CPlat.Core.RunCommandHandlerLinux",
# "version": "1.2.0",
# "location": "https://umsavbvncrpzbnxmxzmr.blob.core.windows.net/f4086d41-69f9-3103-78e0-8a2c7e789d0f/f4086d41-69f9-3103-78e0-8a2c7e789d0f_manifest.xml",
# "failoverlocation": "https://umsajbjtqrb3zqjvgb2z.blob.core.windows.net/f4086d41-69f9-3103-78e0-8a2c7e789d0f/f4086d41-69f9-3103-78e0-8a2c7e789d0f_manifest.xml",
# "additionalLocations": [
# "https://umsawqtlsshtn5v2nfgh.blob.core.windows.net/f4086d41-69f9-3103-78e0-8a2c7e789d0f/f4086d41-69f9-3103-78e0-8a2c7e789d0f_manifest.xml"
# ],
# "state": "enabled",
# "autoUpgrade": true,
# "runAsStartupTask": false,
# "isJson": true,
# "useExactVersion": true,
# "settingsSeqNo": 0,
# "isMultiConfig": true,
# "settings": [
# {
# "publicSettings": "{\"source\":{\"script\":\"echo '4abb1e88-f349-41f8-8442-247d9fdfcac5'\"}}",
# "seqNo": 0,
# "extensionName": "MCExt1",
# "extensionState": "enabled"
# },
# {
# "publicSettings": "{\"source\":{\"script\":\"echo 'e865c9bc-a7b3-42c6-9a79-cfa98a1ee8b3'\"}}",
# "seqNo": 0,
# "extensionName": "MCExt2",
# "extensionState": "enabled"
# },
# {
# "publicSettings": "{\"source\":{\"script\":\"echo 'f923e416-0340-485c-9243-8b84fb9930c6'\"}}",
# "seqNo": 0,
# "extensionName": "MCExt3",
# "extensionState": "enabled"
# }
# ],
# "dependsOn": [
# ...
# ]
# }
# ...
# ]
# ...
# }
extension_goal_states = vm_settings.get("extensionGoalStates")
if extension_goal_states is not None:
if not isinstance(extension_goal_states, list):
raise Exception("extension_goal_states should be an array (got {0})".format(type(extension_goal_states))) # report only the type, since the value may contain secrets
for extension_gs in extension_goal_states:
extension = Extension()
extension.name = extension_gs['name']
extension.version = extension_gs['version']
extension.state = extension_gs['state']
# extension.encoded_signature should be None if 'encodedSignature' key does not exist for the extension
extension.encoded_signature = extension_gs.get('encodedSignature')
if extension.state not in ExtensionRequestedState.All:
raise Exception('Invalid extension state: {0} ({1})'.format(extension.state, extension.name))
is_multi_config = extension_gs.get('isMultiConfig')
if is_multi_config is not None:
extension.supports_multi_config = is_multi_config
location = extension_gs.get('location')
if location is not None:
extension.manifest_uris.append(location)
fail_over_location = extension_gs.get('failoverLocation')
if fail_over_location is not None:
extension.manifest_uris.append(fail_over_location)
additional_locations = extension_gs.get('additionalLocations')
if additional_locations is not None:
if not isinstance(additional_locations, list):
raise Exception('additionalLocations should be an array (got {0})'.format(additional_locations))
extension.manifest_uris.extend(additional_locations)
#
# Settings
#
settings_list = extension_gs.get('settings')
if settings_list is not None:
if not isinstance(settings_list, list):
raise Exception("'settings' should be an array (extension: {0})".format(extension.name))
if not extension.supports_multi_config and len(settings_list) > 1:
raise Exception("Single-config extension includes multiple settings (extension: {0})".format(extension.name))
for s in settings_list:
settings = ExtensionSettings()
public_settings = s.get('publicSettings')
# Note that publicSettings, protectedSettings and protectedSettingsCertThumbprint can be None; do not change this to, for example,
# empty, since those values are serialized to the extension's status file and extensions may depend on the current implementation
# (for example, no public settings would currently be serialized as '"publicSettings": null')
settings.publicSettings = None if public_settings is None else json.loads(public_settings)
settings.protectedSettings = s.get('protectedSettings')
thumbprint = s.get('protectedSettingsCertThumbprint')
if thumbprint is None and settings.protectedSettings is not None:
raise Exception("The certificate thumbprint for protected settings is missing (extension: {0})".format(extension.name))
settings.certificateThumbprint = thumbprint
# in multi-config each settings have their own name, sequence number and state
if extension.supports_multi_config:
settings.name = s['extensionName']
settings.sequenceNumber = s['seqNo']
settings.state = s['extensionState']
else:
settings.name = extension.name
settings.sequenceNumber = extension_gs['settingsSeqNo']
settings.state = extension.state
extension.settings.append(settings)
#
# Dependency level
#
depends_on = extension_gs.get("dependsOn")
if depends_on is not None:
self._parse_dependency_level(depends_on, extension)
self._extensions.append(extension)
@staticmethod
def _parse_dependency_level(depends_on, extension):
# Sample (NOTE: The first sample is single-config, the second multi-config):
# {
# ...
# "extensionGoalStates": [
# {
# "name": "Microsoft.Azure.Monitor.AzureMonitorLinuxAgent",
# ...
# "settings": [
# ...
# ],
# "dependsOn": [
# {
# "DependsOnExtension": [
# {
# "handler": "Microsoft.Azure.Security.Monitoring.AzureSecurityLinuxAgent"
# }
# ],
# "dependencyLevel": 1
# }
# ]
# },
# {
# "name": "Microsoft.CPlat.Core.RunCommandHandlerLinux",
# ...
# "isMultiConfig": true,
# "settings": [
# {
# ...
# "extensionName": "MCExt1",
# },
# {
# ...
# "extensionName": "MCExt2",
# },
# {
# ...
# "extensionName": "MCExt3",
# }
# ],
# "dependsOn": [
# {
# "dependsOnExtension": [
# {
# "extension": "...",
# "handler": "..."
# },
# {
# "extension": "...",
# "handler": "..."
# }
# ],
# "dependencyLevel": 2,
# "name": "MCExt1"
# },
# {
# "dependsOnExtension": [
# {
# "extension": "...",
# "handler": "..."
# }
# ],
# "dependencyLevel": 1,
# "name": "MCExt2"
# }
# ...
# ]
# ...
# }
if not isinstance(depends_on, list):
raise Exception('dependsOn should be an array ({0}) (got {1})'.format(extension.name, depends_on))
if not extension.supports_multi_config:
# single-config
length = len(depends_on)
if length > 1:
raise Exception('dependsOn should be an array with exactly one item for single-config extensions ({0}) (got {1})'.format(extension.name, depends_on))
if length == 0:
logger.warn('dependsOn is an empty array for extension {0}; setting the dependency level to 0'.format(extension.name))
dependency_level = 0
else:
dependency_level = depends_on[0]['dependencyLevel']
depends_on_extension = depends_on[0].get('dependsOnExtension')
if depends_on_extension is None:
# TODO: Consider removing this check and its telemetry after a few releases if we do not receive any telemetry indicating
# that dependsOnExtension is actually missing from the vmSettings
message = 'Missing dependsOnExtension on extension {0}'.format(extension.name)
logger.warn(message)
add_event(WALAEventOperation.ProvisionAfterExtensions, message=message, is_success=False, log_event=False)
else:
message = '{0} depends on {1}'.format(extension.name, depends_on_extension)
logger.info(message)
add_event(WALAEventOperation.ProvisionAfterExtensions, message=message, is_success=True, log_event=False)
if len(extension.settings) == 0:
message = 'Extension {0} does not have any settings. Will ignore dependency (dependency level: {1})'.format(extension.name, dependency_level)
logger.warn(message)
add_event(WALAEventOperation.ProvisionAfterExtensions, message=message, is_success=False, log_event=False)
else:
extension.settings[0].dependencyLevel = dependency_level
else:
# multi-config
settings_by_name = {}
for settings in extension.settings:
settings_by_name[settings.name] = settings
for dependency in depends_on:
settings = settings_by_name.get(dependency["name"])
if settings is None:
raise Exception("Dependency '{0}' does not correspond to any of the settings in the extension (settings: {1})".format(dependency["name"], settings_by_name.keys()))
settings.dependencyLevel = dependency["dependencyLevel"]
#
# TODO: The current implementation of the vmSettings API uses inconsistent cases on the names of the json items it returns.
# To work around that, we use _CaseFoldedDict to query those json items in a case-insensitive matter, Do not use
# _CaseFoldedDict for other purposes. Remove it once the vmSettings API is updated.
#
class _CaseFoldedDict(dict):
@staticmethod
def from_dict(dictionary):
case_folded = _CaseFoldedDict()
for key, value in dictionary.items():
case_folded[key] = _CaseFoldedDict._to_case_folded_dict_item(value)
return case_folded
def get(self, key):
return super(_CaseFoldedDict, self).get(_casefold(key))
def has_key(self, key):
return super(_CaseFoldedDict, self).get(_casefold(key))
def __getitem__(self, key):
return super(_CaseFoldedDict, self).__getitem__(_casefold(key))
def __setitem__(self, key, value):
return super(_CaseFoldedDict, self).__setitem__(_casefold(key), value)
def __contains__(self, key):
return super(_CaseFoldedDict, self).__contains__(_casefold(key))
@staticmethod
def _to_case_folded_dict_item(item):
if isinstance(item, dict):
case_folded_dict = _CaseFoldedDict()
for key, value in item.items():
case_folded_dict[_casefold(key)] = _CaseFoldedDict._to_case_folded_dict_item(value)
return case_folded_dict
if isinstance(item, list):
return [_CaseFoldedDict._to_case_folded_dict_item(list_item) for list_item in item]
return item
def copy(self):
raise NotImplementedError()
@staticmethod
def fromkeys(*args, **kwargs):
raise NotImplementedError()
def pop(self, key, default=None):
raise NotImplementedError()
def setdefault(self, key, default=None):
raise NotImplementedError()
def update(self, E=None, **F): # known special case of dict.update
raise NotImplementedError()
def __delitem__(self, *args, **kwargs):
raise NotImplementedError()
# casefold() does not exist on Python 2 so we use lower() there
def _casefold(string):
if sys.version_info[0] == 2:
return type(string).lower(string) # the type of "string" can be unicode or str
# Class 'str' has no 'casefold' member (no-member) -- Disabled: This warning shows up on Python 2.7 pylint runs
# but this code is actually not executed on Python 2.
return str.casefold(string) # pylint: disable=no-member