azurelinuxagent/common/protocol/extensions_goal_state_from_extensions_config.py (322 lines of code) (raw):
# Microsoft Azure Linux Agent
#
# Copyright 2020 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import json
from collections import defaultdict
from azurelinuxagent.common import logger
from azurelinuxagent.common.event import add_event, WALAEventOperation
from azurelinuxagent.common.exception import ExtensionsConfigError
from azurelinuxagent.common.future import ustr, urlparse
from azurelinuxagent.common.protocol.extensions_goal_state import ExtensionsGoalState, GoalStateChannel, GoalStateSource
from azurelinuxagent.common.protocol.restapi import ExtensionSettings, Extension, VMAgentFamily, ExtensionState, InVMGoalStateMetaData
from azurelinuxagent.common.utils.textutil import parse_doc, parse_json, findall, find, findtext, getattrib, gettext, \
format_exception, is_str_none_or_whitespace, is_str_empty, hasattrib, gettextxml
class ExtensionsGoalStateFromExtensionsConfig(ExtensionsGoalState):
def __init__(self, incarnation, xml_text, wire_client):
super(ExtensionsGoalStateFromExtensionsConfig, self).__init__()
self._id = "incarnation_{0}".format(incarnation)
self._is_outdated = False
self._incarnation = incarnation
self._text = xml_text
self._status_upload_blob = None
self._status_upload_blob_type = None
self._status_upload_blob_xml_node = None
self._artifacts_profile_blob_xml_node = None
self._required_features = []
self._on_hold = False
self._activity_id = None
self._correlation_id = None
self._created_on_timestamp = None
self._agent_families = []
self._extensions = []
try:
self._parse_extensions_config(xml_text, wire_client)
self._do_common_validations()
except Exception as e:
raise ExtensionsConfigError("Error parsing ExtensionsConfig (incarnation: {0}): {1}\n{2}".format(incarnation, format_exception(e), self.get_redacted_text()))
def _parse_extensions_config(self, xml_text, wire_client):
xml_doc = parse_doc(xml_text)
ga_families_list = find(xml_doc, "GAFamilies")
ga_families = findall(ga_families_list, "GAFamily")
for ga_family in ga_families:
name = findtext(ga_family, "Name")
version = findtext(ga_family, "Version")
is_version_from_rsm = findtext(ga_family, "IsVersionFromRSM")
is_vm_enabled_for_rsm_upgrades = findtext(ga_family, "IsVMEnabledForRSMUpgrades")
uris_list = find(ga_family, "Uris")
uris = findall(uris_list, "Uri")
family = VMAgentFamily(name)
family.version = version
if is_version_from_rsm is not None: # checking None because converting string to lowercase
family.is_version_from_rsm = is_version_from_rsm.lower() == "true"
if is_vm_enabled_for_rsm_upgrades is not None: # checking None because converting string to lowercase
family.is_vm_enabled_for_rsm_upgrades = is_vm_enabled_for_rsm_upgrades.lower() == "true"
for uri in uris:
family.uris.append(gettext(uri))
self._agent_families.append(family)
self.__parse_plugins_and_settings_and_populate_ext_handlers(xml_doc)
required_features_list = find(xml_doc, "RequiredFeatures")
if required_features_list is not None:
self._parse_required_features(required_features_list)
self._status_upload_blob_xml_node = find(xml_doc, "StatusUploadBlob")
self._status_upload_blob = gettext(self._status_upload_blob_xml_node)
self._status_upload_blob_type = getattrib(self._status_upload_blob_xml_node, "statusBlobType")
logger.verbose("Extension config shows status blob type as [{0}]", self._status_upload_blob_type)
self._artifacts_profile_blob_xml_node = find(xml_doc, "InVMArtifactsProfileBlob")
self._on_hold = ExtensionsGoalStateFromExtensionsConfig._fetch_extensions_on_hold(self._artifacts_profile_blob_xml_node, wire_client)
in_vm_gs_metadata = InVMGoalStateMetaData(find(xml_doc, "InVMGoalStateMetaData"))
self._activity_id = self._string_to_id(in_vm_gs_metadata.activity_id)
self._correlation_id = self._string_to_id(in_vm_gs_metadata.correlation_id)
self._created_on_timestamp = self._ticks_to_utc_timestamp(in_vm_gs_metadata.created_on_ticks)
@staticmethod
def _fetch_extensions_on_hold(artifacts_profile_blob_xml_node, wire_client):
def log_info(message):
logger.info(message)
add_event(op=WALAEventOperation.ArtifactsProfileBlob, message=message, is_success=True, log_event=False)
def log_warning(message):
logger.warn(message)
add_event(op=WALAEventOperation.ArtifactsProfileBlob, message=message, is_success=False, log_event=False)
artifacts_profile_blob = gettext(artifacts_profile_blob_xml_node)
if is_str_none_or_whitespace(artifacts_profile_blob):
log_info("ExtensionsConfig does not include a InVMArtifactsProfileBlob; will assume the VM is not on hold")
return False
try:
profile = wire_client.fetch_artifacts_profile_blob(artifacts_profile_blob)
except Exception as error:
log_warning("Can't download the artifacts profile blob; will assume the VM is not on hold. {0}".format(ustr(error)))
return False
if is_str_empty(profile):
log_info("The artifacts profile blob is empty; will assume the VM is not on hold.")
return False
try:
artifacts_profile = _InVMArtifactsProfile(profile)
except Exception as exception:
log_warning("Can't parse the artifacts profile blob; will assume the VM is not on hold. Error: {0}".format(ustr(exception)))
return False
return artifacts_profile.get_on_hold()
@property
def id(self):
return self._id
@property
def incarnation(self):
return self._incarnation
@property
def svd_sequence_number(self):
return self._incarnation
@property
def activity_id(self):
return self._activity_id
@property
def correlation_id(self):
return self._correlation_id
@property
def created_on_timestamp(self):
return self._created_on_timestamp
@property
def channel(self):
return GoalStateChannel.WireServer
@property
def source(self):
return GoalStateSource.Fabric
@property
def status_upload_blob(self):
return self._status_upload_blob
@property
def status_upload_blob_type(self):
return self._status_upload_blob_type
def _set_status_upload_blob_type(self, value):
self._status_upload_blob_type = value
@property
def required_features(self):
return self._required_features
@property
def on_hold(self):
return self._on_hold
@property
def agent_families(self):
return self._agent_families
@property
def extensions(self):
return self._extensions
def get_redacted_text(self):
def redact_url(unredacted, xml_node, name):
text_xml = gettextxml(xml_node) # Note that we need to redact the raw XML text (which may contain escape sequences)
if text_xml is None:
return unredacted
parsed = urlparse(text_xml)
redacted = unredacted.replace(parsed.query, "***REDACTED***")
if redacted == unredacted:
raise Exception('Could not redact {0}'.format(name))
return redacted
try:
text = self._text
text = redact_url(text, self._status_upload_blob_xml_node, "StatusUploadBlob")
text = redact_url(text, self._artifacts_profile_blob_xml_node, "InVMArtifactsProfileBlob")
for ext_handler in self._extensions:
for extension in ext_handler.settings:
if extension.protectedSettings is not None:
original = text
text = text.replace(extension.protectedSettings, "***REDACTED***")
if text == original:
return 'Could not redact protectedSettings for {0}'.format(extension.name)
return text
except Exception as e:
return "Error redacting text: {0}".format(e)
def _parse_required_features(self, required_features_list):
for required_feature in findall(required_features_list, "RequiredFeature"):
feature_name = findtext(required_feature, "Name")
# per the documentation, RequiredFeatures also have a "Value" attribute but currently it is not being populated
self._required_features.append(feature_name)
def __parse_plugins_and_settings_and_populate_ext_handlers(self, xml_doc):
"""
Sample ExtensionConfig Plugin and PluginSettings:
<Plugins>
<Plugin name="Microsoft.CPlat.Core.NullSeqB" version="2.0.1" location="https://zrdfepirv2cbn04prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqB_useast2euap_manifest.xml" state="enabled" autoUpgrade="false" failoverlocation="https://zrdfepirv2cbz06prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqB_useast2euap_manifest.xml" runAsStartupTask="false" isJson="true" useExactVersion="true"/>
<Plugin name="Microsoft.CPlat.Core.NullSeqA" version="2.0.1" location="https://zrdfepirv2cbn04prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqA_useast2euap_manifest.xml" state="enabled" autoUpgrade="false" failoverlocation="https://zrdfepirv2cbn06prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqA_useast2euap_manifest.xml" runAsStartupTask="false" isJson="true" useExactVersion="true" encodedSignature="MII..." />
</Plugins>
<PluginSettings>
<Plugin name="Microsoft.CPlat.Core.NullSeqA" version="2.0.1">
<DependsOn dependencyLevel="1">
<DependsOnExtension handler="Microsoft.CPlat.Core.NullSeqB" />
</DependsOn>
<RuntimeSettings seqNo="0">{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"01_add_extensions_with_dependency":"ff2a3da6-8e12-4ab6-a4ca-4e3a473ab385"}
}
}
]
}
</RuntimeSettings>
</Plugin>
<Plugin name="Microsoft.CPlat.Core.NullSeqB" version="2.0.1">
<RuntimeSettings seqNo="0">{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"01_add_extensions_with_dependency":"2e837740-cf7e-4528-b3a4-241002618f05"}
}
}
]
}
</RuntimeSettings>
</Plugin>
</PluginSettings>
"""
plugins_list = find(xml_doc, "Plugins")
plugins = findall(plugins_list, "Plugin")
plugin_settings_list = find(xml_doc, "PluginSettings")
plugin_settings = findall(plugin_settings_list, "Plugin")
for plugin in plugins:
extension = Extension()
try:
ExtensionsGoalStateFromExtensionsConfig._parse_plugin(extension, plugin)
ExtensionsGoalStateFromExtensionsConfig._parse_plugin_settings(extension, plugin_settings)
except ExtensionsConfigError as error:
extension.invalid_setting_reason = ustr(error)
self._extensions.append(extension)
@staticmethod
def _parse_plugin(extension, plugin):
"""
Sample config:
<Plugins>
<Plugin name="Microsoft.CPlat.Core.NullSeqB" version="2.0.1" location="https://zrdfepirv2cbn04prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqB_useast2euap_manifest.xml" state="enabled" autoUpgrade="false" failoverlocation="https://zrdfepirv2cbz06prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqB_useast2euap_manifest.xml" runAsStartupTask="false" isJson="true" useExactVersion="true">
<Plugin name="Microsoft.Azure.Extensions.CustomScript" version="1.0" location="https://rdfecurrentuswestcache.blob.core.test-cint.azure-test.net/0e53c53ef0be4178bacb0a1fecf12a74/Microsoft.Azure.Extensions_CustomScript_usstagesc_manifest.xml" state="enabled" autoUpgrade="false" failoverlocation="https://rdfecurrentuswestcache2.blob.core.test-cint.azure-test.net/0e53c53ef0be4178bacb0a1fecf12a74/Microsoft.Azure.Extensions_CustomScript_usstagesc_manifest.xml" runAsStartupTask="false" isJson="true" useExactVersion="true">
<additionalLocations>
<additionalLocation>https://rdfecurrentuswestcache3.blob.core.test-cint.azure-test.net/0e53c53ef0be4178bacb0a1fecf12a74/Microsoft.Azure.Extensions_CustomScript_usstagesc_manifest.xml</additionalLocation>
<additionalLocation>https://rdfecurrentuswestcache4.blob.core.test-cint.azure-test.net/0e53c53ef0be4178bacb0a1fecf12a74/Microsoft.Azure.Extensions_CustomScript_usstagesc_manifest.xml</additionalLocation>
</additionalLocations>
</Plugin>
<Plugin name="Microsoft.CPlat.Core.NullSeqA" version="2.0.1" location="https://zrdfepirv2cbn04prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqA_useast2euap_manifest.xml" state="enabled" autoUpgrade="false" failoverlocation="https://zrdfepirv2cbn06prdstr01a.blob.core.windows.net/f72653efd9e349ed9842c8b99e4c1712/Microsoft.CPlat.Core_NullSeqA_useast2euap_manifest.xml" runAsStartupTask="false" isJson="true" useExactVersion="true" encodedSignature="MIIn..." />
</Plugins>
Note that the `additionalLocations` subnode is populated with links
generated by PIR for resiliency. In regions with this feature enabled,
CRP will provide any extra links in the format above. If no extra links
are provided, the subnode will not exist.
"""
def _log_error_if_none(attr_name, value):
# Plugin Name and Version are very essential fields, without them we wont be able to even report back to CRP
# about that handler. For those cases we need to fail the GoalState completely but currently we dont support
# reporting status at a GoalState level (we only report at a handler level).
# Once that functionality is added to the GA, we would raise here rather than just report error in our logs.
if value in (None, ""):
add_event(op=WALAEventOperation.InvalidExtensionConfig,
message="{0} is None for ExtensionConfig, logging error".format(attr_name),
log_event=True, is_success=False)
return value
extension.name = _log_error_if_none("Extensions.Plugins.Plugin.name", getattrib(plugin, "name"))
extension.version = _log_error_if_none("Extensions.Plugins.Plugin.version",
getattrib(plugin, "version"))
extension.state = getattrib(plugin, "state")
if extension.state in (None, ""):
raise ExtensionsConfigError("Received empty Extensions.Plugins.Plugin.state, failing Handler")
# extension.encoded_signature value should be None if the property does not exist for the plugin. getattrib
# returns "" if an attribute does not exist in a node, so use hasattrib here to check if the attribute exists
extension.encoded_signature = getattrib(plugin, "encodedSignature") if hasattrib(plugin, "encodedSignature") else None
def getattrib_wrapped_in_list(node, attr_name):
attr = getattrib(node, attr_name)
return [attr] if attr not in (None, "") else []
location = getattrib_wrapped_in_list(plugin, "location")
failover_location = getattrib_wrapped_in_list(plugin, "failoverlocation")
locations = location + failover_location
additional_location_node = find(plugin, "additionalLocations")
if additional_location_node is not None:
nodes_list = findall(additional_location_node, "additionalLocation")
locations += [gettext(node) for node in nodes_list]
for uri in locations:
extension.manifest_uris.append(uri)
@staticmethod
def _parse_plugin_settings(extension, plugin_settings):
"""
Sample config:
<PluginSettings>
<Plugin name="Microsoft.CPlat.Core.NullSeqA" version="2.0.1">
<DependsOn dependencyLevel="1">
<DependsOnExtension handler="Microsoft.CPlat.Core.NullSeqB" />
</DependsOn>
<RuntimeSettings seqNo="0">{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"01_add_extensions_with_dependency":"ff2a3da6-8e12-4ab6-a4ca-4e3a473ab385"}
}
}
]
}
</RuntimeSettings>
</Plugin>
<Plugin name="Microsoft.CPlat.Core.RunCommandHandlerWindows" version="2.0.2">
<ExtensionRuntimeSettings seqNo="4" name="firstRunCommand" state="enabled">{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"source":{"script":"Write-Host First: Hello World TestTry2!"},"parameters":[{"name":"extensionName","value":"firstRunCommand"}],"timeoutInSeconds":120}
}
}
]
}
</ExtensionRuntimeSettings>
</Plugin>
</PluginSettings>
"""
if plugin_settings is None:
return
extension_name = extension.name
version = extension.version
def to_lower(str_to_change): return str_to_change.lower() if str_to_change is not None else None
extension_plugin_settings = [x for x in plugin_settings if to_lower(getattrib(x, "name")) == to_lower(extension_name)]
if not extension_plugin_settings:
return
settings = [x for x in extension_plugin_settings if getattrib(x, "version") == version]
if len(settings) != len(extension_plugin_settings):
msg = "Extension PluginSettings Version Mismatch! Expected PluginSettings version: {0} for Extension: {1} but found versions: ({2})".format(
version, extension_name, ', '.join(set([getattrib(x, "version") for x in extension_plugin_settings])))
add_event(op=WALAEventOperation.PluginSettingsVersionMismatch, message=msg, log_event=True,
is_success=False)
raise ExtensionsConfigError(msg)
if len(settings) > 1:
msg = "Multiple plugin settings found for the same extension: {0} and version: {1} (Expected: 1; Available: {2})".format(
extension_name, version, len(settings))
raise ExtensionsConfigError(msg)
plugin_settings_node = settings[0]
runtime_settings_nodes = findall(plugin_settings_node, "RuntimeSettings")
extension_runtime_settings_nodes = findall(plugin_settings_node, "ExtensionRuntimeSettings")
if any(runtime_settings_nodes) and any(extension_runtime_settings_nodes):
# There can only be a single RuntimeSettings node or multiple ExtensionRuntimeSettings nodes per Plugin
msg = "Both RuntimeSettings and ExtensionRuntimeSettings found for the same extension: {0} and version: {1}".format(
extension_name, version)
raise ExtensionsConfigError(msg)
if runtime_settings_nodes:
if len(runtime_settings_nodes) > 1:
msg = "Multiple RuntimeSettings found for the same extension: {0} and version: {1} (Expected: 1; Available: {2})".format(
extension_name, version, len(runtime_settings_nodes))
raise ExtensionsConfigError(msg)
# Only Runtime settings available, parse that
ExtensionsGoalStateFromExtensionsConfig.__parse_runtime_settings(plugin_settings_node, runtime_settings_nodes[0], extension_name,
extension)
elif extension_runtime_settings_nodes:
# Parse the ExtensionRuntime settings for the given extension
ExtensionsGoalStateFromExtensionsConfig.__parse_extension_runtime_settings(plugin_settings_node, extension_runtime_settings_nodes,
extension)
@staticmethod
def __get_dependency_level_from_node(depends_on_node, name):
depends_on_level = 0
if depends_on_node is not None:
try:
depends_on_level = int(getattrib(depends_on_node, "dependencyLevel"))
except (ValueError, TypeError):
logger.warn("Could not parse dependencyLevel for handler {0}. Setting it to 0".format(name))
depends_on_level = 0
return depends_on_level
@staticmethod
def __parse_runtime_settings(plugin_settings_node, runtime_settings_node, extension_name, extension):
"""
Sample Plugin in PluginSettings containing DependsOn and RuntimeSettings (single settings per extension) -
<Plugin name="Microsoft.Compute.VMAccessAgent" version="2.4.7">
<DependsOn dependencyLevel="2">
<DependsOnExtension extension="firstRunCommand" handler="Microsoft.CPlat.Core.RunCommandHandlerWindows" />
<DependsOnExtension handler="Microsoft.Compute.CustomScriptExtension" />
</DependsOn>
<RuntimeSettings seqNo="1">{
"runtimeSettings": [
{
"handlerSettings": {
"protectedSettingsCertThumbprint": "<Redacted>",
"protectedSettings": "<Redacted>",
"publicSettings": {"UserName":"test1234"}
}
}
]
}
</RuntimeSettings>
</Plugin>
"""
depends_on_nodes = findall(plugin_settings_node, "DependsOn")
if len(depends_on_nodes) > 1:
msg = "Extension Handler can only have a single dependsOn node for Single config extensions. Found: {0}".format(
len(depends_on_nodes))
raise ExtensionsConfigError(msg)
depends_on_node = depends_on_nodes[0] if depends_on_nodes else None
depends_on_level = ExtensionsGoalStateFromExtensionsConfig.__get_dependency_level_from_node(depends_on_node, extension_name)
ExtensionsGoalStateFromExtensionsConfig.__parse_and_add_extension_settings(runtime_settings_node, extension_name, extension,
depends_on_level)
@staticmethod
def __parse_extension_runtime_settings(plugin_settings_node, extension_runtime_settings_nodes, extension):
"""
Sample PluginSettings containing DependsOn and ExtensionRuntimeSettings -
<Plugin name="Microsoft.CPlat.Core.RunCommandHandlerWindows" version="2.0.2">
<DependsOn dependencyLevel="3" name="secondRunCommand">
<DependsOnExtension extension="firstRunCommand" handler="Microsoft.CPlat.Core.RunCommandHandlerWindows" />
<DependsOnExtension handler="Microsoft.Compute.CustomScriptExtension" />
<DependsOnExtension handler="Microsoft.Compute.VMAccessAgent" />
</DependsOn>
<DependsOn dependencyLevel="4" name="thirdRunCommand">
<DependsOnExtension extension="firstRunCommand" handler="Microsoft.CPlat.Core.RunCommandHandlerWindows" />
<DependsOnExtension extension="secondRunCommand" handler="Microsoft.CPlat.Core.RunCommandHandlerWindows" />
<DependsOnExtension handler="Microsoft.Compute.CustomScriptExtension" />
<DependsOnExtension handler="Microsoft.Compute.VMAccessAgent" />
</DependsOn>
<ExtensionRuntimeSettings seqNo="2" name="firstRunCommand" state="enabled">
{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"source":{"script":"Write-Host First: Hello World 1234!"}}
}
}
]
}
</ExtensionRuntimeSettings>
<ExtensionRuntimeSettings seqNo="2" name="secondRunCommand" state="enabled">
{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"source":{"script":"Write-Host First: Hello World 1234!"}}
}
}
]
}
</ExtensionRuntimeSettings>
<ExtensionRuntimeSettings seqNo="1" name="thirdRunCommand" state="enabled">
{
"runtimeSettings": [
{
"handlerSettings": {
"publicSettings": {"source":{"script":"Write-Host Third: Hello World 3!"}}
}
}
]
}
</ExtensionRuntimeSettings>
</Plugin>
"""
# Parse and cache the Dependencies for each extension first
dependency_levels = defaultdict(int)
for depends_on_node in findall(plugin_settings_node, "DependsOn"):
extension_name = getattrib(depends_on_node, "name")
if extension_name in (None, ""):
raise ExtensionsConfigError("No Name not specified for DependsOn object in ExtensionRuntimeSettings for MultiConfig!")
dependency_level = ExtensionsGoalStateFromExtensionsConfig.__get_dependency_level_from_node(depends_on_node, extension_name)
dependency_levels[extension_name] = dependency_level
extension.supports_multi_config = True
for extension_runtime_setting_node in extension_runtime_settings_nodes:
# Name and State will only be set for ExtensionRuntimeSettings for Multi-Config
extension_name = getattrib(extension_runtime_setting_node, "name")
if extension_name in (None, ""):
raise ExtensionsConfigError("Extension Name not specified for ExtensionRuntimeSettings for MultiConfig!")
# State can either be `ExtensionState.Enabled` (default) or `ExtensionState.Disabled`
state = getattrib(extension_runtime_setting_node, "state")
state = ustr(state.lower()) if state not in (None, "") else ExtensionState.Enabled
ExtensionsGoalStateFromExtensionsConfig.__parse_and_add_extension_settings(extension_runtime_setting_node, extension_name,
extension, dependency_levels[extension_name],
state=state)
@staticmethod
def __parse_and_add_extension_settings(settings_node, name, extension, depends_on_level, state=ExtensionState.Enabled):
seq_no = getattrib(settings_node, "seqNo")
if seq_no in (None, ""):
raise ExtensionsConfigError("SeqNo not specified for the Extension: {0}".format(name))
try:
runtime_settings = json.loads(gettext(settings_node))
except ValueError as error:
logger.error("Invalid extension settings: {0}", ustr(error))
# Incase of invalid/no settings, add the name and seqNo of the Extension and treat it as an extension with
# no settings since we were able to successfully parse those data properly. Without this, we wont report
# anything for that sequence number and CRP would eventually have to timeout rather than fail fast.
extension.settings.append(
ExtensionSettings(name=name, sequenceNumber=seq_no, state=state, dependencyLevel=depends_on_level))
return
for plugin_settings_list in runtime_settings["runtimeSettings"]:
handler_settings = plugin_settings_list["handlerSettings"]
extension_settings = ExtensionSettings()
# There is no "extension name" for single Handler Settings. Use HandlerName for those
extension_settings.name = name
extension_settings.state = state
extension_settings.sequenceNumber = int(seq_no)
extension_settings.publicSettings = handler_settings.get("publicSettings")
extension_settings.protectedSettings = handler_settings.get("protectedSettings")
extension_settings.dependencyLevel = depends_on_level
thumbprint = handler_settings.get("protectedSettingsCertThumbprint")
extension_settings.certificateThumbprint = thumbprint
extension.settings.append(extension_settings)
# Do not extend this class
class _InVMArtifactsProfile(object):
"""
deserialized json string of InVMArtifactsProfile.
It is expected to contain the following fields:
* inVMArtifactsProfileBlobSeqNo
* profileId (optional)
* onHold (optional)
* certificateThumbprint (optional)
* encryptedHealthChecks (optional)
* encryptedApplicationProfile (optional)
"""
def __init__(self, artifacts_profile_json):
self._on_hold = False
artifacts_profile = parse_json(artifacts_profile_json)
on_hold = artifacts_profile.get('onHold')
if on_hold is not None:
# accept both bool and str values
on_hold_normalized = str(on_hold).lower()
if on_hold_normalized == "true":
self._on_hold = True
elif on_hold_normalized == "false":
self._on_hold = False
else:
raise Exception("Invalid value for onHold: {0}".format(on_hold))
def get_on_hold(self):
return self._on_hold