azurelinuxagent/common/protocol/goal_state.py (495 lines of code) (raw):
# Microsoft Azure Linux Agent
#
# Copyright 2020 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import datetime
import os
import re
import time
import json
from azurelinuxagent.common import conf
from azurelinuxagent.common import logger
from azurelinuxagent.common.AgentGlobals import AgentGlobals
from azurelinuxagent.common.datacontract import set_properties
from azurelinuxagent.common.event import add_event, WALAEventOperation
from azurelinuxagent.common.exception import ProtocolError, ResourceGoneError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.protocol.extensions_goal_state_factory import ExtensionsGoalStateFactory
from azurelinuxagent.common.protocol.extensions_goal_state import VmSettingsParseError, GoalStateSource
from azurelinuxagent.common.protocol.hostplugin import VmSettingsNotSupported, VmSettingsSupportStopped
from azurelinuxagent.common.protocol.restapi import Cert, CertList, RemoteAccessUser, RemoteAccessUsersList, ExtHandlerPackage, ExtHandlerPackageList
from azurelinuxagent.common.utils import fileutil
from azurelinuxagent.common.utils.archive import GoalStateHistory, SHARED_CONF_FILE_NAME
from azurelinuxagent.common.utils.cryptutil import CryptUtil
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, findtext, getattrib, gettext
GOAL_STATE_URI = "http://{0}/machine/?comp=goalstate"
CERTS_FILE_NAME = "Certificates.xml"
P7M_FILE_NAME = "Certificates.p7m"
PEM_FILE_NAME = "Certificates.pem"
TRANSPORT_CERT_FILE_NAME = "TransportCert.pem"
TRANSPORT_PRV_FILE_NAME = "TransportPrivate.pem"
_GET_GOAL_STATE_MAX_ATTEMPTS = 6
class GoalStateProperties(object):
"""
Enum for defining the properties that we fetch in the goal state
"""
RoleConfig = 0x1
HostingEnv = 0x2
SharedConfig = 0x4
ExtensionsGoalState = 0x8
Certificates = 0x10
RemoteAccessInfo = 0x20
All = RoleConfig | HostingEnv | SharedConfig | ExtensionsGoalState | Certificates | RemoteAccessInfo
class GoalState(object):
def __init__(self, wire_client, goal_state_properties=GoalStateProperties.All, silent=False, save_to_history=False):
"""
Fetches the goal state using the given wire client.
Fetching the goal state involves several HTTP requests to the WireServer and the HostGAPlugin. There is an initial request to WireServer's goalstate API,
which response includes the incarnation, role instance, container ID, role config, and URIs to the rest of the goal state (ExtensionsConfig, Certificates,
Remote Access users, etc.). Additional requests are done using those URIs (all of them point to APIs in the WireServer). Additionally, there is a
request to the HostGAPlugin for the vmSettings, which determines the goal state for extensions when using the Fast Track pipeline.
To reduce the number of requests, when possible, create a single instance of GoalState and use the update() method to keep it up to date.
"""
try:
self._wire_client = wire_client
self._history = None
self._save_to_history = save_to_history
self._extensions_goal_state = None # populated from vmSettings or extensionsConfig
self._goal_state_properties = goal_state_properties
self.logger = logger.Logger(logger.DEFAULT_LOGGER)
self.logger.silent = silent
# These properties hold the goal state from the WireServer and are initialized by self._fetch_full_wire_server_goal_state()
self._incarnation = None
self._role_instance_id = None
self._role_config_name = None
self._container_id = None
self._hosting_env = None
self._shared_conf = None
self._certs = EmptyCertificates()
self._certs_uri = None
self._remote_access = None
self.update(silent=silent)
except ProtocolError:
raise
except Exception as exception:
# We don't log the error here since fetching the goal state is done every few seconds
raise ProtocolError(msg="Error fetching goal state", inner=exception)
@property
def incarnation(self):
return self._incarnation
@property
def container_id(self):
if not self._goal_state_properties & GoalStateProperties.RoleConfig:
raise ProtocolError("ContainerId is not in goal state properties")
else:
return self._container_id
@property
def role_instance_id(self):
if not self._goal_state_properties & GoalStateProperties.RoleConfig:
raise ProtocolError("RoleInstanceId is not in goal state properties")
else:
return self._role_instance_id
@property
def role_config_name(self):
if not self._goal_state_properties & GoalStateProperties.RoleConfig:
raise ProtocolError("RoleConfig is not in goal state properties")
else:
return self._role_config_name
@property
def extensions_goal_state(self):
if not self._goal_state_properties & GoalStateProperties.ExtensionsGoalState:
raise ProtocolError("ExtensionsGoalState is not in goal state properties")
else:
return self._extensions_goal_state
@property
def certs(self):
if not self._goal_state_properties & GoalStateProperties.Certificates:
raise ProtocolError("Certificates is not in goal state properties")
else:
return self._certs
@property
def hosting_env(self):
if not self._goal_state_properties & GoalStateProperties.HostingEnv:
raise ProtocolError("HostingEnvironment is not in goal state properties")
else:
return self._hosting_env
@property
def shared_conf(self):
if not self._goal_state_properties & GoalStateProperties.SharedConfig:
raise ProtocolError("SharedConfig is not in goal state properties")
else:
return self._shared_conf
@property
def remote_access(self):
if not self._goal_state_properties & GoalStateProperties.RemoteAccessInfo:
raise ProtocolError("RemoteAccessInfo is not in goal state properties")
else:
return self._remote_access
def fetch_agent_manifest(self, family_name, uris):
"""
This is a convenience method that wraps WireClient.fetch_manifest(), but adds the required 'use_verify_header' parameter and saves
the manifest to the history folder.
"""
return self._fetch_manifest("agent", "waagent.{0}".format(family_name), uris)
def fetch_extension_manifest(self, extension_name, uris):
"""
This is a convenience method that wraps WireClient.fetch_manifest(), but adds the required 'use_verify_header' parameter and saves
the manifest to the history folder.
"""
return self._fetch_manifest("extension", extension_name, uris)
def _fetch_manifest(self, manifest_type, name, uris):
try:
is_fast_track = self.extensions_goal_state.source == GoalStateSource.FastTrack
xml_text = self._wire_client.fetch_manifest(manifest_type, uris, use_verify_header=is_fast_track)
if self._save_to_history:
self._history.save_manifest(name, xml_text)
return ExtensionManifest(xml_text)
except Exception as e:
raise ProtocolError("Failed to retrieve {0} manifest. Error: {1}".format(manifest_type, ustr(e)))
@staticmethod
def update_host_plugin_headers(wire_client):
"""
Updates the container ID and role config name that are send in the headers of HTTP requests to the HostGAPlugin
"""
# Fetching the goal state updates the HostGAPlugin so simply trigger the request
GoalState._fetch_goal_state(wire_client)
def update(self, force_update=False, silent=False):
"""
Updates the current GoalState instance fetching values from the WireServer/HostGAPlugin as needed
"""
self.logger.silent = silent
#
# Fetch the goal state from both the HGAP and the WireServer
#
timestamp = datetime.datetime.utcnow()
if force_update:
message = "Refreshing goal state and vmSettings"
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
incarnation, xml_text, xml_doc = GoalState._fetch_goal_state(self._wire_client)
goal_state_updated = force_update or incarnation != self._incarnation
if goal_state_updated:
message = 'Fetched a new incarnation for the WireServer goal state [incarnation {0}]'.format(incarnation)
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
vm_settings, vm_settings_updated = None, False
if self._goal_state_properties & GoalStateProperties.ExtensionsGoalState:
try:
vm_settings, vm_settings_updated = GoalState._fetch_vm_settings(self._wire_client, force_update=force_update)
except VmSettingsSupportStopped as exception: # If the HGAP stopped supporting vmSettings, we need to use the goal state from the WireServer
self._restore_wire_server_goal_state(incarnation, xml_text, xml_doc, exception)
return
if vm_settings_updated:
self.logger.info('')
message = "Fetched new vmSettings [HostGAPlugin correlation ID: {0} eTag: {1} source: {2}]".format(vm_settings.hostga_plugin_correlation_id, vm_settings.etag, vm_settings.source)
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
# Ignore the vmSettings if their source is Fabric (processing a Fabric goal state may require the tenant certificate and the vmSettings don't include it.)
if vm_settings is not None and vm_settings.source == GoalStateSource.Fabric:
if vm_settings_updated:
message = "The vmSettings originated via Fabric; will ignore them."
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
vm_settings, vm_settings_updated = None, False
# If neither goal state has changed we are done with the update
if not goal_state_updated and not vm_settings_updated:
return
# Start a new history subdirectory and capture the updated goal state
tag = "{0}".format(incarnation) if vm_settings is None else "{0}-{1}".format(incarnation, vm_settings.etag)
if self._save_to_history:
self._history = GoalStateHistory(timestamp, tag)
if goal_state_updated:
self._history.save_goal_state(xml_text)
if vm_settings_updated:
self._history.save_vm_settings(vm_settings.get_redacted_text())
#
# Continue fetching the rest of the goal state
#
extensions_config = None
if goal_state_updated:
extensions_config = self._fetch_full_wire_server_goal_state(incarnation, xml_doc)
#
# Lastly, decide whether to use the vmSettings or extensionsConfig for the extensions goal state
#
if goal_state_updated:
# On rotation of the tenant certificate the vmSettings and extensionsConfig are not updated. However, the incarnation of the WS goal state is update so 'goal_state_updated' will be True.
# In this case, we should use the most recent of vmSettigns and extensionsConfig.
if vm_settings is not None:
most_recent = vm_settings if vm_settings.created_on_timestamp > extensions_config.created_on_timestamp else extensions_config
else:
most_recent = extensions_config
else: # vm_settings_updated
most_recent = vm_settings
if self._extensions_goal_state is None or most_recent.created_on_timestamp >= self._extensions_goal_state.created_on_timestamp:
self._extensions_goal_state = most_recent
#
# Ensure all certificates are downloaded on Fast Track goal states in order to maintain backwards compatibility with previous
# versions of the Agent, which used to download certificates from the WireServer on every goal state. Some customer applications
# depend on this behavior (see https://github.com/Azure/WALinuxAgent/issues/2750).
#
if self._extensions_goal_state.source == GoalStateSource.FastTrack and self._goal_state_properties & GoalStateProperties.Certificates:
self._check_and_download_missing_certs_on_disk()
def _download_certificates(self, certs_uri):
xml_text = self._wire_client.fetch_config(certs_uri, self._wire_client.get_header_for_cert())
certs = Certificates(xml_text, self.logger)
# Log and save the certificates summary (i.e. the thumbprint but not the certificate itself) to the goal state history
for c in certs.summary:
message = "Downloaded certificate {0}".format(c)
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
if len(certs.warnings) > 0:
self.logger.warn(certs.warnings)
add_event(op=WALAEventOperation.GoalState, message=certs.warnings)
if self._save_to_history:
self._history.save_certificates(json.dumps(certs.summary))
return certs
def _check_and_download_missing_certs_on_disk(self):
# Re-download certificates if any have been removed from disk since last download
if self._certs_uri is not None:
certificates = self.certs.summary
certs_missing_from_disk = False
for c in certificates:
cert_path = os.path.join(conf.get_lib_dir(), c['thumbprint'] + '.crt')
if not os.path.isfile(cert_path):
certs_missing_from_disk = True
message = "Certificate required by goal state is not on disk: {0}".format(cert_path)
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
if certs_missing_from_disk:
# Try to re-download certs. Sometimes download may fail if certs_uri is outdated/contains wrong
# container id (for example, when the VM is moved to a new container after resuming from
# hibernation). If download fails we should report and continue with goal state processing, as some
# extensions in the goal state may succeed.
try:
self._download_certificates(self._certs_uri)
except Exception as e:
message = "Unable to download certificates. Goal state processing will continue, some " \
"extensions requiring certificates may fail. Error: {0}".format(ustr(e))
self.logger.warn(message)
add_event(op=WALAEventOperation.GoalState, is_success=False, message=message)
def _restore_wire_server_goal_state(self, incarnation, xml_text, xml_doc, vm_settings_support_stopped_error):
msg = 'The HGAP stopped supporting vmSettings; will fetched the goal state from the WireServer.'
self.logger.info(msg)
add_event(op=WALAEventOperation.VmSettings, message=msg)
if self._save_to_history:
self._history = GoalStateHistory(datetime.datetime.utcnow(), incarnation)
self._history.save_goal_state(xml_text)
self._extensions_goal_state = self._fetch_full_wire_server_goal_state(incarnation, xml_doc)
if self._extensions_goal_state.created_on_timestamp < vm_settings_support_stopped_error.timestamp:
self._extensions_goal_state.is_outdated = True
msg = "Fetched a Fabric goal state older than the most recent FastTrack goal state; will skip it.\nFabric: {0}\nFastTrack: {1}".format(
self._extensions_goal_state.created_on_timestamp, vm_settings_support_stopped_error.timestamp)
self.logger.info(msg)
add_event(op=WALAEventOperation.VmSettings, message=msg)
def save_to_history(self, data, file_name):
if self._save_to_history:
self._history.save(data, file_name)
@staticmethod
def _fetch_goal_state(wire_client):
"""
Issues an HTTP request for the goal state (WireServer) and returns a tuple containing the response as text and as an XML Document
"""
uri = GOAL_STATE_URI.format(wire_client.get_endpoint())
# In some environments a few goal state requests return a missing RoleInstance; these retries are used to work around that issue
# TODO: Consider retrying on 410 (ResourceGone) as well
incarnation = "unknown"
for _ in range(0, _GET_GOAL_STATE_MAX_ATTEMPTS):
xml_text = wire_client.fetch_config(uri, wire_client.get_header())
xml_doc = parse_doc(xml_text)
incarnation = findtext(xml_doc, "Incarnation")
role_instance = find(xml_doc, "RoleInstance")
if role_instance:
break
time.sleep(0.5)
else:
raise ProtocolError("Fetched goal state without a RoleInstance [incarnation {inc}]".format(inc=incarnation))
# Telemetry and the HostGAPlugin depend on the container id/role config; keep them up-to-date each time we fetch the goal state
# (note that these elements can change even if the incarnation of the goal state does not change)
container = find(xml_doc, "Container")
container_id = findtext(container, "ContainerId")
role_config = find(role_instance, "Configuration")
role_config_name = findtext(role_config, "ConfigName")
AgentGlobals.update_container_id(container_id) # Telemetry uses this global to pick up the container id
wire_client.update_host_plugin(container_id, role_config_name)
return incarnation, xml_text, xml_doc
@staticmethod
def _fetch_vm_settings(wire_client, force_update=False):
"""
Issues an HTTP request (HostGAPlugin) for the vm settings and returns the response as an ExtensionsGoalState.
"""
vm_settings, vm_settings_updated = (None, False)
if conf.get_enable_fast_track():
try:
try:
vm_settings, vm_settings_updated = wire_client.get_host_plugin().fetch_vm_settings(force_update=force_update)
except ResourceGoneError:
# retry after refreshing the HostGAPlugin
GoalState.update_host_plugin_headers(wire_client)
vm_settings, vm_settings_updated = wire_client.get_host_plugin().fetch_vm_settings(force_update=force_update)
except VmSettingsSupportStopped:
raise
except VmSettingsNotSupported:
pass
except VmSettingsParseError as exception:
# ensure we save the vmSettings if there were parsing errors, but save them only once per ETag
if not GoalStateHistory.tag_exists(exception.etag):
GoalStateHistory(datetime.datetime.utcnow(), exception.etag).save_vm_settings(exception.vm_settings_text)
raise
return vm_settings, vm_settings_updated
def _fetch_full_wire_server_goal_state(self, incarnation, xml_doc):
"""
Issues HTTP requests (to the WireServer) for each of the URIs in the goal state (ExtensionsConfig, Certificate, Remote Access users, etc)
and populates the corresponding properties.
Returns the value of ExtensionsConfig.
"""
try:
self.logger.info('')
message = 'Fetching full goal state from the WireServer [incarnation {0}]'.format(incarnation)
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
role_instance_id = None
role_config_name = None
container_id = None
if GoalStateProperties.RoleConfig & self._goal_state_properties:
role_instance = find(xml_doc, "RoleInstance")
role_instance_id = findtext(role_instance, "InstanceId")
role_config = find(role_instance, "Configuration")
role_config_name = findtext(role_config, "ConfigName")
container = find(xml_doc, "Container")
container_id = findtext(container, "ContainerId")
extensions_config_uri = findtext(xml_doc, "ExtensionsConfig")
if not (GoalStateProperties.ExtensionsGoalState & self._goal_state_properties) or extensions_config_uri is None:
extensions_config = ExtensionsGoalStateFactory.create_empty(incarnation)
else:
xml_text = self._wire_client.fetch_config(extensions_config_uri, self._wire_client.get_header())
extensions_config = ExtensionsGoalStateFactory.create_from_extensions_config(incarnation, xml_text, self._wire_client)
if self._save_to_history:
self._history.save_extensions_config(extensions_config.get_redacted_text())
hosting_env = None
if GoalStateProperties.HostingEnv & self._goal_state_properties:
hosting_env_uri = findtext(xml_doc, "HostingEnvironmentConfig")
xml_text = self._wire_client.fetch_config(hosting_env_uri, self._wire_client.get_header())
hosting_env = HostingEnv(xml_text)
if self._save_to_history:
self._history.save_hosting_env(xml_text)
shared_config = None
if GoalStateProperties.SharedConfig & self._goal_state_properties:
shared_conf_uri = findtext(xml_doc, "SharedConfig")
xml_text = self._wire_client.fetch_config(shared_conf_uri, self._wire_client.get_header())
shared_config = SharedConfig(xml_text)
if self._save_to_history:
self._history.save_shared_conf(xml_text)
# SharedConfig.xml is used by other components (Azsec and Singularity/HPC Infiniband), so save it to the agent's root directory as well
shared_config_file = os.path.join(conf.get_lib_dir(), SHARED_CONF_FILE_NAME)
try:
fileutil.write_file(shared_config_file, xml_text)
except Exception as e:
logger.warn("Failed to save {0}: {1}".format(shared_config, e))
certs = EmptyCertificates()
certs_uri = findtext(xml_doc, "Certificates")
if (GoalStateProperties.Certificates & self._goal_state_properties) and certs_uri is not None:
certs = self._download_certificates(certs_uri)
remote_access = None
if GoalStateProperties.RemoteAccessInfo & self._goal_state_properties:
remote_access_uri = findtext(container, "RemoteAccessInfo")
if remote_access_uri is not None:
xml_text = self._wire_client.fetch_config(remote_access_uri, self._wire_client.get_header_for_remote_access())
remote_access = RemoteAccess(xml_text)
if self._save_to_history:
self._history.save_remote_access(xml_text)
self._incarnation = incarnation
self._role_instance_id = role_instance_id
self._role_config_name = role_config_name
self._container_id = container_id
self._hosting_env = hosting_env
self._shared_conf = shared_config
self._certs = certs
self._certs_uri = certs_uri
self._remote_access = remote_access
return extensions_config
except Exception as exception:
self.logger.warn("Fetching the goal state failed: {0}", ustr(exception))
raise ProtocolError(msg="Error fetching goal state", inner=exception)
finally:
message = 'Fetch goal state from WireServer completed'
self.logger.info(message)
add_event(op=WALAEventOperation.GoalState, message=message)
class HostingEnv(object):
def __init__(self, xml_text):
self.xml_text = xml_text
xml_doc = parse_doc(xml_text)
incarnation = find(xml_doc, "Incarnation")
self.vm_name = getattrib(incarnation, "instance")
role = find(xml_doc, "Role")
self.role_name = getattrib(role, "name")
deployment = find(xml_doc, "Deployment")
self.deployment_name = getattrib(deployment, "name")
class SharedConfig(object):
def __init__(self, xml_text):
self.xml_text = xml_text
class Certificates(object):
def __init__(self, xml_text, my_logger):
self.cert_list = CertList()
self.summary = [] # debugging info
self.warnings = []
# Save the certificates
local_file = os.path.join(conf.get_lib_dir(), CERTS_FILE_NAME)
fileutil.write_file(local_file, xml_text)
# Separate the certificates into individual files.
xml_doc = parse_doc(xml_text)
data = findtext(xml_doc, "Data")
if data is None:
return
# if the certificates format is not Pkcs7BlobWithPfxContents do not parse it
certificate_format = findtext(xml_doc, "Format")
if certificate_format and certificate_format != "Pkcs7BlobWithPfxContents":
message = "The Format is not Pkcs7BlobWithPfxContents. Format is {0}".format(certificate_format)
my_logger.warn(message)
add_event(op=WALAEventOperation.GoalState, message=message)
return
cryptutil = CryptUtil(conf.get_openssl_cmd())
p7m_file = os.path.join(conf.get_lib_dir(), P7M_FILE_NAME)
p7m = ("MIME-Version:1.0\n" # pylint: disable=W1308
"Content-Disposition: attachment; filename=\"{0}\"\n"
"Content-Type: application/x-pkcs7-mime; name=\"{1}\"\n"
"Content-Transfer-Encoding: base64\n"
"\n"
"{2}").format(p7m_file, p7m_file, data)
fileutil.write_file(p7m_file, p7m)
trans_prv_file = os.path.join(conf.get_lib_dir(), TRANSPORT_PRV_FILE_NAME)
trans_cert_file = os.path.join(conf.get_lib_dir(), TRANSPORT_CERT_FILE_NAME)
pem_file = os.path.join(conf.get_lib_dir(), PEM_FILE_NAME)
# decrypt certificates
cryptutil.decrypt_p7m(p7m_file, trans_prv_file, trans_cert_file, pem_file)
# The parsing process use public key to match prv and crt.
buf = []
prvs = {}
thumbprints = {}
index = 0
v1_cert_list = []
# Ensure pem_file exists before read the certs data since decrypt_p7m may clear the pem_file wen decryption fails
if os.path.exists(pem_file):
with open(pem_file) as pem:
for line in pem.readlines():
buf.append(line)
if re.match(r'[-]+END.*KEY[-]+', line):
tmp_file = Certificates._write_to_tmp_file(index, 'prv', buf)
pub = cryptutil.get_pubkey_from_prv(tmp_file)
prvs[pub] = tmp_file
buf = []
index += 1
elif re.match(r'[-]+END.*CERTIFICATE[-]+', line):
tmp_file = Certificates._write_to_tmp_file(index, 'crt', buf)
pub = cryptutil.get_pubkey_from_crt(tmp_file)
thumbprint = cryptutil.get_thumbprint_from_crt(tmp_file)
thumbprints[pub] = thumbprint
# Rename crt with thumbprint as the file name
crt = "{0}.crt".format(thumbprint)
v1_cert_list.append({
"name": None,
"thumbprint": thumbprint
})
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), crt))
buf = []
index += 1
# Rename prv key with thumbprint as the file name
for pubkey in prvs:
thumbprint = thumbprints[pubkey]
if thumbprint:
tmp_file = prvs[pubkey]
prv = "{0}.prv".format(thumbprint)
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), prv))
else:
# Since private key has *no* matching certificate,
# it will not be named correctly
self.warnings.append("Found NO matching cert/thumbprint for private key!")
for pubkey, thumbprint in thumbprints.items():
has_private_key = pubkey in prvs
self.summary.append({"thumbprint": thumbprint, "hasPrivateKey": has_private_key})
for v1_cert in v1_cert_list:
cert = Cert()
set_properties("certs", cert, v1_cert)
self.cert_list.certificates.append(cert)
@staticmethod
def _write_to_tmp_file(index, suffix, buf):
file_name = os.path.join(conf.get_lib_dir(), "{0}.{1}".format(index, suffix))
fileutil.write_file(file_name, "".join(buf))
return file_name
class EmptyCertificates:
def __init__(self):
self.cert_list = CertList()
self.summary = [] # debugging info
self.warnings = []
class RemoteAccess(object):
"""
Object containing information about user accounts
"""
#
# <RemoteAccess>
# <Version/>
# <Incarnation/>
# <Users>
# <User>
# <Name/>
# <Password/>
# <Expiration/>
# </User>
# </Users>
# </RemoteAccess>
#
def __init__(self, xml_text):
self.xml_text = xml_text
self.version = None
self.incarnation = None
self.user_list = RemoteAccessUsersList()
if self.xml_text is None or len(self.xml_text) == 0:
return
xml_doc = parse_doc(self.xml_text)
self.version = findtext(xml_doc, "Version")
self.incarnation = findtext(xml_doc, "Incarnation")
user_collection = find(xml_doc, "Users")
users = findall(user_collection, "User")
for user in users:
remote_access_user = RemoteAccess._parse_user(user)
self.user_list.users.append(remote_access_user)
@staticmethod
def _parse_user(user):
name = findtext(user, "Name")
encrypted_password = findtext(user, "Password")
expiration = findtext(user, "Expiration")
remote_access_user = RemoteAccessUser(name, encrypted_password, expiration)
return remote_access_user
class ExtensionManifest(object):
def __init__(self, xml_text):
if xml_text is None:
raise ValueError("ExtensionManifest is None")
logger.verbose("Load ExtensionManifest.xml")
self.pkg_list = ExtHandlerPackageList()
self._parse(xml_text)
def _parse(self, xml_text):
xml_doc = parse_doc(xml_text)
self._handle_packages(findall(find(xml_doc,
"Plugins"),
"Plugin"),
False)
self._handle_packages(findall(find(xml_doc,
"InternalPlugins"),
"Plugin"),
True)
def _handle_packages(self, packages, isinternal):
for package in packages:
version = findtext(package, "Version")
disallow_major_upgrade = findtext(package,
"DisallowMajorVersionUpgrade")
if disallow_major_upgrade is None:
disallow_major_upgrade = ''
disallow_major_upgrade = disallow_major_upgrade.lower() == "true"
uris = find(package, "Uris")
uri_list = findall(uris, "Uri")
uri_list = [gettext(x) for x in uri_list]
pkg = ExtHandlerPackage()
pkg.version = version
pkg.disallow_major_upgrade = disallow_major_upgrade
for uri in uri_list:
pkg.uris.append(uri)
pkg.isinternal = isinternal
self.pkg_list.versions.append(pkg)