azurelinuxagent/ga/exthandlers.py (1,611 lines of code) (raw):
# Microsoft Azure Linux Agent
#
# Copyright Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
#
import copy
import datetime
import glob
import json
import os
import re
import shutil
import stat
import tempfile
import time
import zipfile
from collections import defaultdict
from functools import partial
from azurelinuxagent.common import conf
from azurelinuxagent.common import logger
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.utils import fileutil
from azurelinuxagent.common import version
from azurelinuxagent.common import event
from azurelinuxagent.common.agent_supported_feature import get_agent_supported_features_list_for_extensions, \
SupportedFeatureNames, get_supported_feature_by_name, get_agent_supported_features_list_for_crp
from azurelinuxagent.ga.cgroupconfigurator import CGroupConfigurator
from azurelinuxagent.common.datacontract import get_properties, set_properties
from azurelinuxagent.common.errorstate import ErrorState
from azurelinuxagent.common.event import add_event, elapsed_milliseconds, WALAEventOperation, \
add_periodic, EVENTS_DIRECTORY
from azurelinuxagent.common.exception import ExtensionDownloadError, ExtensionError, ExtensionErrorCodes, \
ExtensionOperationError, ExtensionUpdateError, ProtocolError, ProtocolNotFoundError, ExtensionsGoalStateError, \
GoalStateAggregateStatusCodes, MultiConfigExtensionEnableError
from azurelinuxagent.common.future import ustr, is_file_not_found_error
from azurelinuxagent.common.protocol.extensions_goal_state import GoalStateSource
from azurelinuxagent.common.protocol.restapi import ExtensionStatus, ExtensionSubStatus, Extension, ExtHandlerStatus, \
VMStatus, GoalStateAggregateStatus, ExtensionState, ExtensionRequestedState, ExtensionSettings
from azurelinuxagent.common.utils import textutil
from azurelinuxagent.common.utils.archive import ARCHIVE_DIRECTORY_NAME
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
_HANDLER_NAME_PATTERN = r'^([^-]+)'
_HANDLER_VERSION_PATTERN = r'(\d+(?:\.\d+)*)'
_HANDLER_PATTERN = _HANDLER_NAME_PATTERN + r"-" + _HANDLER_VERSION_PATTERN
_HANDLER_PKG_PATTERN = re.compile(_HANDLER_PATTERN + r'\.zip$', re.IGNORECASE)
_DEFAULT_EXT_TIMEOUT_MINUTES = 90
_VALID_HANDLER_STATUS = ['Ready', 'NotReady', "Installing", "Unresponsive"]
HANDLER_NAME_PATTERN = re.compile(_HANDLER_NAME_PATTERN, re.IGNORECASE)
HANDLER_COMPLETE_NAME_PATTERN = re.compile(_HANDLER_PATTERN + r'$', re.IGNORECASE)
HANDLER_PKG_EXT = ".zip"
# This is the default value for the env variables, whenever we call a command which is not an update scenario, we
# set the env variable value to NOT_RUN to reduce ambiguity for the extension publishers
NOT_RUN = "NOT_RUN"
# Max size of individual status file
_MAX_STATUS_FILE_SIZE_IN_BYTES = 128 * 1024 # 128K
# Truncating length of fields.
_MAX_STATUS_MESSAGE_LENGTH = 1024 # 1k message allowed to be shown in the portal.
_MAX_SUBSTATUS_FIELD_LENGTH = 10 * 1024 # Making 10K; allowing fields to have enough debugging information..
_TRUNCATED_SUFFIX = u" ... [TRUNCATED]"
# Status file specific retries and delays.
_NUM_OF_STATUS_FILE_RETRIES = 5
_STATUS_FILE_RETRY_DELAY = 2 # seconds
# This is the default sequence number we use when there are no settings available for Handlers
_DEFAULT_SEQ_NO = "0"
class ExtHandlerStatusValue(object):
"""
Statuses for Extension Handlers
"""
ready = "Ready"
not_ready = "NotReady"
class ExtensionStatusValue(object):
"""
Statuses for Extensions
"""
transitioning = "transitioning"
warning = "warning"
error = "error"
success = "success"
STRINGS = ['transitioning', 'warning', 'error', 'success']
_EXTENSION_TERMINAL_STATUSES = [ExtensionStatusValue.error, ExtensionStatusValue.success]
class ExtCommandEnvVariable(object):
Prefix = "AZURE_GUEST_AGENT"
DisableReturnCode = "{0}_DISABLE_CMD_EXIT_CODE".format(Prefix)
DisableReturnCodeMultipleExtensions = "{0}_DISABLE_CMD_EXIT_CODES_MULTIPLE_EXTENSIONS".format(Prefix)
UninstallReturnCode = "{0}_UNINSTALL_CMD_EXIT_CODE".format(Prefix)
ExtensionPath = "{0}_EXTENSION_PATH".format(Prefix)
ExtensionVersion = "{0}_EXTENSION_VERSION".format(Prefix)
ExtensionSeqNumber = "ConfigSequenceNumber" # At par with Windows Guest Agent
ExtensionName = "ConfigExtensionName"
UpdatingFromVersion = "{0}_UPDATING_FROM_VERSION".format(Prefix)
WireProtocolAddress = "{0}_WIRE_PROTOCOL_ADDRESS".format(Prefix)
ExtensionSupportedFeatures = "{0}_EXTENSION_SUPPORTED_FEATURES".format(Prefix)
def validate_has_key(obj, key, full_key_path):
if key not in obj:
raise ExtensionStatusError(msg="Invalid status format by extension: Missing {0} key".format(full_key_path),
code=ExtensionStatusError.StatusFileMalformed)
def validate_in_range(val, valid_range, name):
if val not in valid_range:
raise ExtensionStatusError(msg="Invalid value {0} in range {1} at the node {2}".format(val, valid_range, name),
code=ExtensionStatusError.StatusFileMalformed)
def parse_formatted_message(formatted_message):
if formatted_message is None:
return None
validate_has_key(formatted_message, 'lang', 'formattedMessage/lang')
validate_has_key(formatted_message, 'message', 'formattedMessage/message')
return formatted_message.get('message')
def parse_ext_substatus(substatus):
# Check extension sub status format
validate_has_key(substatus, 'status', 'substatus/status')
validate_in_range(substatus['status'], ExtensionStatusValue.STRINGS, 'substatus/status')
status = ExtensionSubStatus()
status.name = substatus.get('name')
status.status = substatus.get('status')
status.code = substatus.get('code', 0)
formatted_message = substatus.get('formattedMessage')
status.message = parse_formatted_message(formatted_message)
return status
def parse_ext_status(ext_status, data):
if data is None:
return
if not isinstance(data, list):
data_string = ustr(data)[:4096]
raise ExtensionStatusError(msg="The extension status must be an array: {0}".format(data_string), code=ExtensionStatusError.StatusFileMalformed)
if not data:
return
# Currently, only the first status will be reported
data = data[0]
# Check extension status format
validate_has_key(data, 'status', 'status')
status_data = data['status']
validate_has_key(status_data, 'status', 'status/status')
status = status_data['status']
if status not in ExtensionStatusValue.STRINGS:
status = ExtensionStatusValue.error
applied_time = status_data.get('configurationAppliedTime')
ext_status.configurationAppliedTime = applied_time
ext_status.operation = status_data.get('operation')
ext_status.status = status
ext_status.code = status_data.get('code', 0)
formatted_message = status_data.get('formattedMessage')
ext_status.message = parse_formatted_message(formatted_message)
substatus_list = status_data.get('substatus', [])
# some extensions incorrectly report an empty substatus with a null value
if substatus_list is None:
substatus_list = []
for substatus in substatus_list:
if substatus is not None:
ext_status.substatusList.append(parse_ext_substatus(substatus))
def migrate_handler_state():
"""
Migrate handler state and status (if they exist) from an agent-owned directory into the
handler-owned config directory
Notes:
- The v2.0.x branch wrote all handler-related state into the handler-owned config
directory (e.g., /var/lib/waagent/Microsoft.Azure.Extensions.LinuxAsm-2.0.1/config).
- The v2.1.x branch original moved that state into an agent-owned handler
state directory (e.g., /var/lib/waagent/handler_state).
- This move can cause v2.1.x agents to multiply invoke a handler's install command. It also makes
clean-up more difficult since the agent must remove the state as well as the handler directory.
"""
handler_state_path = os.path.join(conf.get_lib_dir(), "handler_state")
if not os.path.isdir(handler_state_path):
return
for handler_path in glob.iglob(os.path.join(handler_state_path, "*")):
handler = os.path.basename(handler_path)
handler_config_path = os.path.join(conf.get_lib_dir(), handler, "config")
if os.path.isdir(handler_config_path):
for file in ("State", "Status"): # pylint: disable=redefined-builtin
from_path = os.path.join(handler_state_path, handler, file.lower())
to_path = os.path.join(handler_config_path, "Handler" + file)
if os.path.isfile(from_path) and not os.path.isfile(to_path):
try:
shutil.move(from_path, to_path)
except Exception as e:
logger.warn(
"Exception occurred migrating {0} {1} file: {2}",
handler,
file,
str(e))
try:
shutil.rmtree(handler_state_path)
except Exception as e:
logger.warn("Exception occurred removing {0}: {1}", handler_state_path, str(e))
return
class ExtHandlerState(object):
NotInstalled = "NotInstalled"
Installed = "Installed"
Enabled = "Enabled"
FailedUpgrade = "FailedUpgrade"
class GoalStateStatus(object):
"""
This is an Enum to define the State of the GoalState as a whole. This is reported as part of the
'vmArtifactsAggregateStatus.goalStateAggregateStatus' in the status blob.
Note: not to be confused with the State of the ExtHandler which reported as part of 'handlerAggregateStatus'
"""
Success = "Success"
Failed = "Failed"
# The following field is not used now but would be needed once Status reporting is moved to a separate thread.
Initialize = "Initialize"
Transitioning = "Transitioning"
def get_exthandlers_handler(protocol):
return ExtHandlersHandler(protocol)
def list_agent_lib_directory(skip_agent_package=True, ignore_names=None):
lib_dir = conf.get_lib_dir()
for name in os.listdir(lib_dir):
path = os.path.join(lib_dir, name)
if ignore_names is not None and any(ignore_names) and name in ignore_names:
continue
if skip_agent_package and (version.is_agent_package(path) or version.is_agent_path(path)):
continue
yield name, path
class ExtHandlersHandler(object):
def __init__(self, protocol):
self.protocol = protocol
self.ext_handlers = None
# The GoalState Aggregate status needs to report the last status of the GoalState. Since we only process
# extensions on goal state change, we need to maintain its state.
# Setting the status to None here. This would be overridden as soon as the first GoalState is processed
self.__gs_aggregate_status = None
# CRP Activity ID for the goal state that is being processed. Initialized once we start processing the goal state.
self._gs_activity_id = '00000000-0000-0000-0000-000000000000'
self.report_status_error_state = ErrorState()
def __last_gs_unsupported(self):
# Return if the last GoalState was unsupported
return self.__gs_aggregate_status is not None and \
self.__gs_aggregate_status.status == GoalStateStatus.Failed and \
self.__gs_aggregate_status.code == GoalStateAggregateStatusCodes.GoalStateUnsupportedRequiredFeatures
def run(self):
try:
gs = self.protocol.get_goal_state()
egs = gs.extensions_goal_state
self._gs_activity_id = egs.activity_id
# self.ext_handlers needs to be initialized before returning, since status reporting depends on it; also
# we make a deep copy of the extensions, since changes are made to self.ext_handlers while processing the extensions
self.ext_handlers = copy.deepcopy(egs.extensions)
if self._extensions_on_hold():
return
utc_start = datetime.datetime.utcnow()
error = None
message = "ProcessExtensionsGoalState started [{0} channel: {1} source: {2} activity: {3} correlation {4} created: {5}]".format(
egs.id, egs.channel, egs.source, egs.activity_id, egs.correlation_id, egs.created_on_timestamp)
logger.info('')
logger.info(message)
add_event(op=WALAEventOperation.ExtensionProcessing, message=message)
try:
self.__process_and_handle_extensions(egs.svd_sequence_number, egs.id)
self._cleanup_outdated_handlers()
except Exception as e:
error = u"Error processing extensions:{0}".format(textutil.format_exception(e))
finally:
duration = elapsed_milliseconds(utc_start)
if error is None:
message = 'ProcessExtensionsGoalState completed [{0} {1} ms]\n'.format(egs.id, duration)
logger.info(message)
else:
message = 'ProcessExtensionsGoalState failed [{0} {1} ms]\n{2}'.format(egs.id, duration, error)
logger.error(message)
add_event(op=WALAEventOperation.ExtensionProcessing, is_success=(error is None), message=message, log_event=False, duration=duration)
except Exception as error:
msg = u"ProcessExtensionsInGoalState - Exception processing extension handlers:{0}".format(textutil.format_exception(error))
logger.error(msg)
add_event(op=WALAEventOperation.ExtensionProcessing, is_success=False, message=msg, log_event=False)
def __get_unsupported_features(self):
required_features = self.protocol.get_goal_state().extensions_goal_state.required_features
supported_features = get_agent_supported_features_list_for_crp()
return [feature for feature in required_features if feature not in supported_features]
def __process_and_handle_extensions(self, svd_sequence_number, goal_state_id):
try:
# Verify we satisfy all required features, if any. If not, report failure here itself, no need to process anything further.
unsupported_features = self.__get_unsupported_features()
if any(unsupported_features):
msg = "Failing GS {0} as Unsupported features found: {1}".format(goal_state_id, ', '.join(unsupported_features))
logger.warn(msg)
self.__gs_aggregate_status = GoalStateAggregateStatus(status=GoalStateStatus.Failed, seq_no=svd_sequence_number,
code=GoalStateAggregateStatusCodes.GoalStateUnsupportedRequiredFeatures,
message=msg)
add_event(op=WALAEventOperation.GoalStateUnsupportedFeatures,
is_success=False,
message=msg,
log_event=False)
else:
self.handle_ext_handlers(goal_state_id)
self.__gs_aggregate_status = GoalStateAggregateStatus(status=GoalStateStatus.Success, seq_no=svd_sequence_number,
code=GoalStateAggregateStatusCodes.Success,
message="GoalState executed successfully")
except Exception as error:
msg = "Unexpected error when processing goal state:{0}".format(textutil.format_exception(error))
self.__gs_aggregate_status = GoalStateAggregateStatus(status=GoalStateStatus.Failed, seq_no=svd_sequence_number,
code=GoalStateAggregateStatusCodes.GoalStateUnknownFailure,
message=msg)
logger.warn(msg)
add_event(op=WALAEventOperation.ExtensionProcessing,
is_success=False,
message=msg,
log_event=False)
@staticmethod
def get_ext_handler_instance_from_path(name, path, protocol, skip_handlers=None):
if not os.path.isdir(path) or re.match(HANDLER_NAME_PATTERN, name) is None:
return None
separator = name.rfind('-')
handler_name = name[0:separator]
if skip_handlers is not None and handler_name in skip_handlers:
# Handler in skip_handlers list, not parsing it
return None
eh = Extension(name=handler_name)
eh.version = str(FlexibleVersion(name[separator + 1:]))
return ExtHandlerInstance(eh, protocol)
def _cleanup_outdated_handlers(self):
# Skip cleanup if the previous GS was Unsupported
if self.__last_gs_unsupported():
return
handlers = []
pkgs = []
ext_handlers_in_gs = [ext_handler.name for ext_handler in self.ext_handlers]
# Build a collection of uninstalled handlers and orphaned packages
# Note:
# -- An orphaned package is one without a corresponding handler
# directory
for item, path in list_agent_lib_directory(skip_agent_package=True):
try:
handler_instance = ExtHandlersHandler.get_ext_handler_instance_from_path(name=item,
path=path,
protocol=self.protocol,
skip_handlers=ext_handlers_in_gs)
if handler_instance is not None:
# Since this handler name doesn't exist in the GS, marking it for deletion
handlers.append(handler_instance)
continue
except Exception:
continue
if os.path.isfile(path) and \
not os.path.isdir(path[0:-len(HANDLER_PKG_EXT)]):
if not re.match(_HANDLER_PKG_PATTERN, item):
continue
pkgs.append(path)
# Then, remove the orphaned packages
for pkg in pkgs:
try:
os.remove(pkg)
logger.verbose("Removed orphaned extension package {0}".format(pkg))
except OSError as e:
logger.warn("Failed to remove orphaned package {0}: {1}".format(pkg, e.strerror))
# Finally, remove the directories and packages of the orphaned handlers, i.e. Any extension directory that
# is still in the FileSystem but not in the GoalState
for handler in handlers:
handler.remove_ext_handler()
pkg = os.path.join(conf.get_lib_dir(), handler.get_full_name() + HANDLER_PKG_EXT)
if os.path.isfile(pkg):
try:
os.remove(pkg)
logger.verbose("Removed extension package {0}".format(pkg))
except OSError as e:
logger.warn("Failed to remove extension package {0}: {1}".format(pkg, e.strerror))
def _extensions_on_hold(self):
if conf.get_enable_overprovisioning():
if self.protocol.get_goal_state().extensions_goal_state.on_hold:
msg = "Extension handling is on hold"
logger.info(msg)
add_event(op=WALAEventOperation.ExtensionProcessing, message=msg)
return True
return False
@staticmethod
def __get_dependency_level(tup):
(extension, handler) = tup
if extension is not None:
return extension.dependency_level_sort_key(handler.state)
return handler.dependency_level_sort_key()
def __get_sorted_extensions_for_processing(self):
all_extensions = []
for handler in self.ext_handlers:
if any(handler.settings):
all_extensions.extend([(ext, handler) for ext in handler.settings])
else:
# We need to process the Handler even if no settings specified from CRP (legacy behavior)
logger.info("No extension/run-time settings settings found for {0}".format(handler.name))
all_extensions.append((None, handler))
all_extensions.sort(key=self.__get_dependency_level)
return all_extensions
def handle_ext_handlers(self, goal_state_id):
if not self.ext_handlers:
logger.info("No extension handlers found, not processing anything.")
return
wait_until = datetime.datetime.utcnow() + datetime.timedelta(minutes=_DEFAULT_EXT_TIMEOUT_MINUTES)
all_extensions = self.__get_sorted_extensions_for_processing()
# Since all_extensions are sorted based on sort_key, the last element would be the maximum based on the sort_key
max_dep_level = self.__get_dependency_level(all_extensions[-1]) if any(all_extensions) else 0
depends_on_err_msg = None
extensions_enabled = conf.get_extensions_enabled()
for extension, ext_handler in all_extensions:
handler_i = ExtHandlerInstance(ext_handler, self.protocol, extension=extension)
# In case of extensions disabled, we skip processing extensions. But CRP is still waiting for some status
# back for the skipped extensions. In order to propagate the status back to CRP, we will report status back
# here with an error message.
if not extensions_enabled:
agent_conf_file_path = get_osutil().agent_conf_file_path
msg = "Extension will not be processed since extension processing is disabled. To enable extension " \
"processing, set Extensions.Enabled=y in '{0}'".format(agent_conf_file_path)
ext_full_name = handler_i.get_extension_full_name(extension)
logger.info('')
logger.info("{0}: {1}".format(ext_full_name, msg))
add_event(op=WALAEventOperation.ExtensionProcessing, message="{0}: {1}".format(ext_full_name, msg))
handler_i.set_handler_status(status=ExtHandlerStatusValue.not_ready, message=msg, code=-1)
handler_i.create_status_file_if_not_exist(extension,
status=ExtensionStatusValue.error,
code=-1,
operation=handler_i.operation,
message=msg)
continue
# In case of depends-on errors, we skip processing extensions if there was an error processing dependent extensions.
# But CRP is still waiting for some status back for the skipped extensions. In order to propagate the status back to CRP,
# we will report status back here with the relevant error message for each of the dependent extension.
if depends_on_err_msg is not None:
# For MC extensions, report the HandlerStatus as is and create a new placeholder per extension if doesnt exist
if handler_i.should_perform_multi_config_op(extension):
# Ensure some handler status exists for the Handler, if not, set it here
if handler_i.get_handler_status() is None:
handler_i.set_handler_status(message=depends_on_err_msg, code=-1)
handler_i.create_status_file_if_not_exist(extension, status=ExtensionStatusValue.error, code=-1,
operation=WALAEventOperation.ExtensionProcessing,
message=depends_on_err_msg)
# For SC extensions, overwrite the HandlerStatus with the relevant message
else:
handler_i.set_handler_status(message=depends_on_err_msg, code=-1)
continue
# Process extensions and get if it was successfully executed or not
extension_success = self.handle_ext_handler(handler_i, extension, goal_state_id)
dep_level = self.__get_dependency_level((extension, ext_handler))
if 0 <= dep_level < max_dep_level:
extension_full_name = handler_i.get_extension_full_name(extension)
try:
# Do no wait for extension status if the handler failed
if not extension_success:
raise Exception("Skipping processing of extensions since execution of dependent extension {0} failed".format(
extension_full_name))
# Wait for the extension installation until it is handled.
# This is done for the install and enable. Not for the uninstallation.
# If handled successfully, proceed with the current handler.
# Otherwise, skip the rest of the extension installation.
self.wait_for_handler_completion(handler_i, wait_until, extension=extension)
except Exception as error:
logger.warn(
"Dependent extension {0} failed or timed out, will skip processing the rest of the extensions".format(
extension_full_name))
depends_on_err_msg = ustr(error)
add_event(name=extension_full_name,
version=handler_i.ext_handler.version,
op=WALAEventOperation.ExtensionProcessing,
is_success=False,
message=depends_on_err_msg)
@staticmethod
def wait_for_handler_completion(handler_i, wait_until, extension=None):
"""
Check the status of the extension being handled. Wait until it has a terminal state or times out.
:raises: Exception if it is not handled successfully.
"""
extension_name = handler_i.get_extension_full_name(extension)
# If the handler had no settings, we should not wait at all for handler to report status.
if extension is None:
logger.info("No settings found for {0}, not waiting for it's status".format(extension_name))
return
try:
ext_completed, status = False, None
# Keep polling for the extension status until it succeeds or times out
while datetime.datetime.utcnow() <= wait_until:
ext_completed, status = handler_i.is_ext_handling_complete(extension)
if ext_completed:
break
time.sleep(5)
except Exception as e:
msg = "Failed to wait for Handler completion due to unknown error. Marking the dependent extension as failed: {0}, {1}".format(
extension_name, textutil.format_exception(e))
raise Exception(msg)
# In case of timeout or terminal error state, we log it and raise
# Incase extension reported status at the last sec, we should prioritize reporting status over timeout
if not ext_completed and datetime.datetime.utcnow() > wait_until:
msg = "Dependent Extension {0} did not reach a terminal state within the allowed timeout. Last status was {1}".format(
extension_name, status)
raise Exception(msg)
if status != ExtensionStatusValue.success:
msg = "Dependent Extension {0} did not succeed. Status was {1}".format(extension_name, status)
raise Exception(msg)
def handle_ext_handler(self, ext_handler_i, extension, goal_state_id):
"""
Execute the requested command for the handler and return if success
:param ext_handler_i: The ExtHandlerInstance object to execute the command on
:param extension: The extension settings on which to run the command on
:param goal_state_id: ID of the current GoalState
:return: True if the operation was successful, False if not
"""
try:
# Ensure the extension config was valid
if ext_handler_i.ext_handler.is_invalid_setting:
raise ExtensionsGoalStateError(ext_handler_i.ext_handler.invalid_setting_reason)
handler_state = ext_handler_i.ext_handler.state
# The Guest Agent currently only supports 1 installed version per extension on the VM.
# If the extension version is unregistered and the customers wants to uninstall the extension,
# we should let it go through even if the installed version doesnt exist in Handler manifest (PIR) anymore.
# If target state is enabled and version not found in manifest, do not process the extension.
if ext_handler_i.decide_version(target_state=handler_state, extension=extension, gs_activity_id=self._gs_activity_id) is None and handler_state == ExtensionRequestedState.Enabled:
handler_version = ext_handler_i.ext_handler.version
name = ext_handler_i.ext_handler.name
err_msg = "Unable to find version {0} in manifest for extension {1}".format(handler_version, name)
ext_handler_i.set_operation(WALAEventOperation.Download)
raise ExtensionError(msg=err_msg)
# Handle everything on an extension level rather than Handler level
ext_handler_i.logger.info("Target handler state: {0} [{1}]", handler_state, goal_state_id)
if handler_state == ExtensionRequestedState.Enabled:
self.handle_enable(ext_handler_i, extension)
elif handler_state == ExtensionRequestedState.Disabled:
# The "disabled" state is now deprecated. Send telemetry if it is still being used on any VMs
event.info(WALAEventOperation.RequestedStateDisabled, 'Goal State is requesting "disabled" state on {0} [Activity ID: {1}]', ext_handler_i.ext_handler.name, self._gs_activity_id)
self.handle_disable(ext_handler_i, extension)
elif handler_state == ExtensionRequestedState.Uninstall:
self.handle_uninstall(ext_handler_i, extension=extension)
else:
message = u"Unknown ext handler state:{0}".format(handler_state)
raise ExtensionError(message)
return True
except MultiConfigExtensionEnableError as error:
ext_name = ext_handler_i.get_extension_full_name(extension)
err_msg = "Error processing MultiConfig extension {0}: {1}".format(ext_name, ustr(error))
# This error is only thrown for enable operation on MultiConfig extension.
# Since these are maintained by the extensions, the expectation here is that they would update their status files appropriately with their errors.
# The extensions should already have a placeholder status file, but incase they dont, setting one here to fail fast.
ext_handler_i.create_status_file_if_not_exist(extension, status=ExtensionStatusValue.error, code=error.code,
operation=ext_handler_i.operation, message=err_msg)
add_event(name=ext_name, version=ext_handler_i.ext_handler.version, op=ext_handler_i.operation,
is_success=False, log_event=True, message=err_msg)
except ExtensionsGoalStateError as error:
# Catch and report Invalid ExtensionConfig errors here to fail fast rather than timing out after 90 min
err_msg = "Ran into config errors: {0}. \nPlease retry again as another operation with updated settings".format(
ustr(error))
self.__handle_and_report_ext_handler_errors(ext_handler_i, error,
report_op=WALAEventOperation.InvalidExtensionConfig,
message=err_msg, extension=extension)
except ExtensionUpdateError as error:
# Not reporting the error as it has already been reported from the old version
self.__handle_and_report_ext_handler_errors(ext_handler_i, error, ext_handler_i.operation, ustr(error),
report=False, extension=extension)
except ExtensionDownloadError as error:
msg = "Failed to download artifacts: {0}".format(ustr(error))
self.__handle_and_report_ext_handler_errors(ext_handler_i, error, report_op=WALAEventOperation.Download,
message=msg, extension=extension)
except ExtensionError as error:
self.__handle_and_report_ext_handler_errors(ext_handler_i, error, ext_handler_i.operation, ustr(error),
extension=extension)
except Exception as error:
error.code = -1
self.__handle_and_report_ext_handler_errors(ext_handler_i, error, ext_handler_i.operation, ustr(error),
extension=extension)
return False
@staticmethod
def __handle_and_report_ext_handler_errors(ext_handler_i, error, report_op, message, report=True, extension=None):
# This function is only called for Handler level errors, we capture MultiConfig errors separately,
# so report only HandlerStatus here.
ext_handler_i.set_handler_status(message=message, code=error.code)
# If the handler supports multi-config, create a status file with failed status if no status file exists.
# This is for correctly reporting errors back to CRP for failed Handler level operations for MultiConfig extensions.
# In case of Handler failures, we will retry each time for each extension, so we need to create a status
# file with failure since the extensions wont be called where they can create their status files.
# This way we guarantee reporting back to CRP
if ext_handler_i.should_perform_multi_config_op(extension):
ext_handler_i.create_status_file_if_not_exist(extension, status=ExtensionStatusValue.error, code=error.code,
operation=report_op, message=message)
if report:
name = ext_handler_i.get_extension_full_name(extension)
handler_version = ext_handler_i.ext_handler.version
add_event(name=name, version=handler_version, op=report_op, is_success=False, log_event=True,
message=message)
def handle_enable(self, ext_handler_i, extension):
"""
1- Ensure the handler is installed
2- Check if extension is enabled or disabled and then process accordingly
"""
uninstall_exit_code = None
old_ext_handler_i = ext_handler_i.get_installed_ext_handler()
current_handler_state = ext_handler_i.get_handler_state()
ext_handler_i.logger.info("[Enable] current handler state is: {0}", current_handler_state.lower())
# We go through the entire process of downloading and initializing the extension if it's either a fresh
# extension or if it's a retry of a previously failed upgrade.
if current_handler_state == ExtHandlerState.NotInstalled or current_handler_state == ExtHandlerState.FailedUpgrade:
self.__setup_new_handler(ext_handler_i, extension)
if old_ext_handler_i is None:
ext_handler_i.install(extension=extension)
elif ext_handler_i.version_ne(old_ext_handler_i):
# This is a special case, we need to update the handler version here but to do that we need to also
# disable each enabled extension of this handler.
uninstall_exit_code = ExtHandlersHandler._update_extension_handler_and_return_if_failed(
old_ext_handler_i, ext_handler_i, extension)
else:
ext_handler_i.ensure_consistent_data_for_mc()
ext_handler_i.update_settings(extension)
self.__handle_extension(ext_handler_i, extension, uninstall_exit_code)
@staticmethod
def __setup_new_handler(ext_handler_i, extension):
ext_handler_i.set_handler_state(ExtHandlerState.NotInstalled)
ext_handler_i.download()
ext_handler_i.initialize()
ext_handler_i.update_settings(extension)
@staticmethod
def __handle_extension(ext_handler_i, extension, uninstall_exit_code):
# Check if extension level settings provided for the handler, if not, call enable for the handler.
# This is legacy behavior, we can have handlers with no settings.
if extension is None:
ext_handler_i.enable()
return
# MultiConfig: Handle extension level ops here
ext_handler_i.logger.info("Requested extension state: {0}", extension.state)
if extension.state == ExtensionState.Enabled:
ext_handler_i.enable(extension, uninstall_exit_code=uninstall_exit_code)
elif extension.state == ExtensionState.Disabled:
# Only disable extension if the requested state == Disabled and current state is != Disabled
if ext_handler_i.get_extension_state(extension) != ExtensionState.Disabled:
# Extensions can only be disabled for Multi Config extensions. Disable operation for extension is
# tantamount to uninstalling Handler so ignoring errors incase of Disable failure and deleting state.
ext_handler_i.disable(extension, ignore_error=True)
else:
ext_handler_i.logger.info("Extension already disabled, not doing anything")
else:
raise ExtensionsGoalStateError(
"Unknown requested state for Extension {0}: {1}".format(extension.name, extension.state))
@staticmethod
def _update_extension_handler_and_return_if_failed(old_ext_handler_i, ext_handler_i, extension=None):
def execute_old_handler_command_and_return_if_succeeds(func):
"""
Created a common wrapper to execute all commands that need to be executed from the old handler
so that it can have a common exception handling mechanism
:param func: The command to be executed on the old handler
:return: True if command execution succeeds and False if it fails
"""
continue_on_update_failure = False
exit_code = 0
try:
continue_on_update_failure = ext_handler_i.load_manifest().is_continue_on_update_failure()
func()
except ExtensionError as e:
# Reporting the event with the old handler and raising a new ExtensionUpdateError to set the
# handler status on the new version
msg = "%s; ContinueOnUpdate: %s" % (ustr(e), continue_on_update_failure)
old_ext_handler_i.report_event(message=msg, is_success=False)
if not continue_on_update_failure:
raise ExtensionUpdateError(msg)
exit_code = e.code
if isinstance(e, ExtensionOperationError):
exit_code = e.exit_code # pylint: disable=E1101
logger.info("Continue on Update failure flag is set, proceeding with update")
return exit_code
disable_exit_codes = defaultdict(lambda: NOT_RUN)
# We only want to disable the old handler if it is currently enabled; no other state makes sense.
if old_ext_handler_i.get_handler_state() == ExtHandlerState.Enabled:
# Corner case - If the old handler is a Single config Handler with no extensions at all,
# we should just disable the handler
if not old_ext_handler_i.supports_multi_config and not any(old_ext_handler_i.extensions):
disable_exit_codes[
old_ext_handler_i.ext_handler.name] = execute_old_handler_command_and_return_if_succeeds(
func=partial(old_ext_handler_i.disable, extension=None))
# Else we disable all enabled extensions of this handler
# Note: If MC is supported this will disable only enabled_extensions else it will disable all extensions
for old_ext in old_ext_handler_i.enabled_extensions:
disable_exit_codes[old_ext.name] = execute_old_handler_command_and_return_if_succeeds(
func=partial(old_ext_handler_i.disable, extension=old_ext))
ext_handler_i.copy_status_files(old_ext_handler_i)
if ext_handler_i.version_gt(old_ext_handler_i):
ext_handler_i.update(disable_exit_codes=disable_exit_codes,
updating_from_version=old_ext_handler_i.ext_handler.version,
extension=extension)
else:
updating_from_version = ext_handler_i.ext_handler.version
old_ext_handler_i.update(handler_version=updating_from_version,
disable_exit_codes=disable_exit_codes, updating_from_version=updating_from_version,
extension=extension)
uninstall_exit_code = execute_old_handler_command_and_return_if_succeeds(
func=partial(old_ext_handler_i.uninstall, extension=extension))
old_ext_handler_i.remove_ext_handler()
ext_handler_i.update_with_install(uninstall_exit_code=uninstall_exit_code, extension=extension)
return uninstall_exit_code
def handle_disable(self, ext_handler_i, extension=None):
"""
Disable is a legacy behavior, CRP doesn't support it, its only for XML based extensions.
In case we get a disable request, just disable that extension.
"""
handler_state = ext_handler_i.get_handler_state()
ext_handler_i.logger.info("[Disable] current handler state is: {0}", handler_state.lower())
if handler_state == ExtHandlerState.Enabled:
ext_handler_i.disable(extension)
def handle_uninstall(self, ext_handler_i, extension):
"""
To Uninstall the handler, first ensure all extensions are disabled
1- Disable all enabled extensions first if Handler is Enabled and then Disable the handler
(disabled extensions wont have any extensions dependent on them so we can just go
ahead and remove all of them at once if HandlerState==Uninstall.
CRP will only set the HandlerState to Uninstall if all its extensions are set to be disabled)
2- Finally uninstall the handler
"""
handler_state = ext_handler_i.get_handler_state()
ext_handler_i.logger.info("[Uninstall] current handler state is: {0}", handler_state.lower())
if handler_state != ExtHandlerState.NotInstalled:
if handler_state == ExtHandlerState.Enabled:
# Corner case - Single config Handler with no extensions at all
# If there are no extension settings for Handler, we should just disable the handler
if not ext_handler_i.supports_multi_config and not any(ext_handler_i.extensions):
ext_handler_i.disable()
# If Handler is Enabled, there should be atleast 1 enabled extension for the handler
# Note: If MC is supported this will disable only enabled_extensions else it will disable all extensions
for enabled_ext in ext_handler_i.enabled_extensions:
ext_handler_i.disable(enabled_ext)
# Try uninstalling the extension and swallow any exceptions in case of failures after logging them
try:
ext_handler_i.uninstall(extension=extension)
except ExtensionError as e:
ext_handler_i.report_event(message=ustr(e), is_success=False)
ext_handler_i.remove_ext_handler()
def __get_handlers_on_file_system(self, goal_state_changed):
handlers_to_report = []
# Ignoring the `history` and `events` directories as they're not handlers and are agent-generated
for item, path in list_agent_lib_directory(skip_agent_package=True,
ignore_names=[EVENTS_DIRECTORY, ARCHIVE_DIRECTORY_NAME]):
try:
handler_instance = ExtHandlersHandler.get_ext_handler_instance_from_path(name=item,
path=path,
protocol=self.protocol)
if handler_instance is not None:
ext_handler = handler_instance.ext_handler
# For each handler we need to add extensions to report their status.
# For Single Config, we just need to add one extension with name as Handler Name
# For Multi Config, walk the config directory and find all unique extension names
# and add them as extensions to the handler.
extensions_names = set()
# Settings for Multi Config are saved as <extName>.<seqNo>.settings.
# Use this pattern to determine if Handler supports Multi Config or not and add extensions
for settings_path in glob.iglob(os.path.join(handler_instance.get_conf_dir(), "*.*.settings")):
match = re.search("(?P<extname>\\w+)\\.\\d+\\.settings", settings_path)
if match is not None:
extensions_names.add(match.group("extname"))
ext_handler.supports_multi_config = True
# If nothing found with that pattern then its a Single Config, add an extension with Handler Name
if not any(extensions_names):
extensions_names.add(ext_handler.name)
for ext_name in extensions_names:
ext = ExtensionSettings(name=ext_name)
# Fetch the last modified sequence number
seq_no, _ = handler_instance.get_status_file_path(ext)
ext.sequenceNumber = seq_no
# Append extension to the list of extensions for the handler
ext_handler.settings.append(ext)
handlers_to_report.append(ext_handler)
except Exception as error:
# Log error once per goal state
if goal_state_changed:
logger.warn("Can't fetch ExtHandler from path: {0}; Error: {1}".format(path, ustr(error)))
return handlers_to_report
def report_ext_handlers_status(self, goal_state_changed=False, vm_agent_update_status=None,
vm_agent_supports_fast_track=False):
"""
Go through handler_state dir, collect and report status.
Returns the status it reported, or None if an error occurred.
"""
try:
vm_status = VMStatus(status="Ready", message="Guest Agent is running",
gs_aggregate_status=self.__gs_aggregate_status,
vm_agent_update_status=vm_agent_update_status)
vm_status.vmAgent.set_supports_fast_track(vm_agent_supports_fast_track)
handlers_to_report = []
# In case of Unsupported error, report the status of the handlers in the VM
if self.__last_gs_unsupported():
handlers_to_report = self.__get_handlers_on_file_system(goal_state_changed)
# If GoalState supported, report the status of extension handlers that were requested by the GoalState
elif not self.__last_gs_unsupported() and self.ext_handlers is not None:
handlers_to_report = self.ext_handlers
for ext_handler in handlers_to_report:
try:
self.report_ext_handler_status(vm_status, ext_handler, goal_state_changed)
except ExtensionError as error:
add_event(op=WALAEventOperation.ExtensionProcessing, is_success=False, message=ustr(error))
logger.verbose("Report vm agent status")
try:
self.protocol.report_vm_status(vm_status)
logger.verbose("Completed vm agent status report successfully")
self.report_status_error_state.reset()
except ProtocolNotFoundError as error:
self.report_status_error_state.incr()
message = "Failed to report vm agent status: {0}".format(error)
logger.verbose(message)
except ProtocolError as error:
self.report_status_error_state.incr()
message = "Failed to report vm agent status: {0}".format(error)
add_event(AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.ExtensionProcessing,
is_success=False,
message=message)
if self.report_status_error_state.is_triggered():
message = "Failed to report vm agent status for more than {0}" \
.format(self.report_status_error_state.min_timedelta)
add_event(AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.ReportStatusExtended,
is_success=False,
message=message)
self.report_status_error_state.reset()
return vm_status
except Exception as error:
msg = u"Failed to report status: {0}".format(textutil.format_exception(error))
logger.warn(msg)
add_event(AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.ReportStatus,
is_success=False,
message=msg)
return None
def report_ext_handler_status(self, vm_status, ext_handler, goal_state_changed):
ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol)
handler_status = ext_handler_i.get_handler_status()
# If nothing available, skip reporting
if handler_status is None:
# We should always have some handler status if requested state != Uninstall irrespective of single or
# multi-config. If state is != Uninstall, report error
if ext_handler.state != ExtensionRequestedState.Uninstall:
msg = "No handler status found for {0}. Not reporting anything for it.".format(ext_handler.name)
ext_handler_i.report_error_on_incarnation_change(goal_state_changed, log_msg=msg, event_msg=msg)
return
handler_state = ext_handler_i.get_handler_state()
ext_handler_statuses = []
# For MultiConfig, we need to report status per extension even for Handler level failures.
# If we have HandlerStatus for a MultiConfig handler and GS is requesting for it, we would report status per
# extension even if HandlerState == NotInstalled (Sample scenario: ExtensionsGoalStateError, DecideVersionError, etc)
# We also need to report extension status for an uninstalled handler if extensions are disabled because CRP
# waits for extension runtime status before failing the extension operation.
if handler_state != ExtHandlerState.NotInstalled or ext_handler.supports_multi_config or not conf.get_extensions_enabled():
# Since we require reading the Manifest for reading the heartbeat, this would fail if HandlerManifest not found.
# Only try to read heartbeat if HandlerState != NotInstalled.
if handler_state != ExtHandlerState.NotInstalled:
# Heartbeat is a handler level thing only, so we dont need to modify this
try:
heartbeat = ext_handler_i.collect_heartbeat()
if heartbeat is not None:
handler_status.status = heartbeat.get('status')
if 'formattedMessage' in heartbeat:
handler_status.message = parse_formatted_message(heartbeat.get('formattedMessage'))
except ExtensionError as e:
ext_handler_i.set_handler_status(message=ustr(e), code=e.code)
ext_handler_statuses = ext_handler_i.get_extension_handler_statuses(handler_status, goal_state_changed)
# If not any extension status reported, report the Handler status
if not any(ext_handler_statuses):
ext_handler_statuses.append(handler_status)
vm_status.vmAgent.extensionHandlers.extend(ext_handler_statuses)
class ExtHandlerInstance(object):
def __init__(self, ext_handler, protocol, execution_log_max_size=(10 * 1024 * 1024), extension=None):
self.ext_handler = ext_handler
self.protocol = protocol
self.operation = None
self.pkg = None
self.pkg_file = None
self.logger = None
self.set_logger(extension=extension, execution_log_max_size=execution_log_max_size)
@property
def supports_multi_config(self):
return self.ext_handler.supports_multi_config
@property
def extensions(self):
return self.ext_handler.settings
@property
def enabled_extensions(self):
"""
In case of Single config, just return all the extensions of the handler
(expectation being that there'll only be a single extension per handler).
We will not be maintaining extension level state for Single config Handlers
"""
if self.supports_multi_config:
return [ext for ext in self.extensions if self.get_extension_state(ext) == ExtensionState.Enabled]
return self.extensions
def get_extension_full_name(self, extension=None):
"""
Get the full name of the extension <HandlerName>.<ExtensionName>.
:param extension: The requested extension
:return: <HandlerName> if MultiConfig not supported or extension == None, else <HandlerName>.<ExtensionName>
"""
if self.should_perform_multi_config_op(extension):
return "{0}.{1}".format(self.ext_handler.name, extension.name)
return self.ext_handler.name
def __set_command_execution_log(self, extension, execution_log_max_size):
try:
fileutil.mkdir(self.get_log_dir(), mode=0o755, reset_mode_and_owner=False)
except IOError as e:
self.logger.error(u"Failed to create extension log dir: {0}", e)
else:
log_file_name = "CommandExecution.log" if not self.should_perform_multi_config_op(
extension) else "CommandExecution_{0}.log".format(extension.name)
log_file = os.path.join(self.get_log_dir(), log_file_name)
self.__truncate_file_head(log_file, execution_log_max_size, self.get_extension_full_name(extension))
self.logger.add_appender(logger.AppenderType.FILE, logger.LogLevel.INFO, log_file)
@staticmethod
def __truncate_file_head(filename, max_size, extension_name):
try:
if os.stat(filename).st_size <= max_size:
return
with open(filename, "rb") as existing_file:
existing_file.seek(-1 * max_size, 2)
_ = existing_file.readline()
with open(filename + ".tmp", "wb") as tmp_file:
shutil.copyfileobj(existing_file, tmp_file)
os.rename(filename + ".tmp", filename)
except (IOError, OSError) as e:
if is_file_not_found_error(e):
# If CommandExecution.log does not exist, it's not noteworthy;
# this just means that no extension with self.ext_handler.name is
# installed.
return
logger.error(
"Exception occurred while attempting to truncate {0} for extension {1}. Exception is: {2}",
filename, extension_name, ustr(e))
for f in (filename, filename + ".tmp"):
try:
os.remove(f)
except (IOError, OSError) as cleanup_exception:
if is_file_not_found_error(cleanup_exception):
logger.info("File '{0}' does not exist.", f)
else:
logger.warn("Exception occurred while attempting to remove file '{0}': {1}", f,
cleanup_exception)
def decide_version(self, target_state, extension, gs_activity_id):
self.logger.verbose("Decide which version to use")
try:
manifest = self.protocol.get_goal_state().fetch_extension_manifest(self.ext_handler.name, self.ext_handler.manifest_uris)
pkg_list = manifest.pkg_list
except ProtocolError as e:
raise ExtensionError("Failed to get ext handler pkgs", e)
except ExtensionDownloadError:
self.set_operation(WALAEventOperation.Download)
raise
# Determine the desired and installed versions
requested_version = FlexibleVersion(str(self.ext_handler.version))
installed_version_string = self.get_installed_version()
installed_version = requested_version if installed_version_string is None else FlexibleVersion(installed_version_string)
# Divide packages
# - Find the installed package (its version must exactly match)
# - Find the internal candidate (its version must exactly match)
# - Separate the public packages
selected_pkg = None
installed_pkg = None
pkg_list.versions.sort(key=lambda p: FlexibleVersion(p.version))
for pkg in pkg_list.versions:
pkg_version = FlexibleVersion(pkg.version)
if pkg_version == installed_version:
installed_pkg = pkg
if requested_version.matches(pkg_version):
selected_pkg = pkg
# Finally, update the version only if not downgrading
# Note:
# - A downgrade, which will be bound to the same major version,
# is allowed if the installed version is no longer available
if target_state in (ExtensionRequestedState.Uninstall, ExtensionRequestedState.Disabled):
if installed_pkg is None:
msg = "Failed to find installed version: {0} of Handler: {1} in handler manifest to uninstall.".format(
installed_version, self.ext_handler.name)
self.logger.warn(msg)
self.pkg = installed_pkg
self.ext_handler.version = str(installed_version) \
if installed_version is not None else None
else:
self.pkg = selected_pkg
if self.pkg is not None:
if self.ext_handler.version != str(selected_pkg.version):
# The Agent should not change the version requested by the Goal State. Send telemetry if this happens.
event.info(
WALAEventOperation.RequestedVersionMismatch,
'Goal State requesting {0} version {1}, but Agent overriding with version {2} [Activity ID: {3}]', self.ext_handler.name, self.ext_handler.version, selected_pkg.version, gs_activity_id)
self.ext_handler.version = str(selected_pkg.version)
if self.pkg is not None:
self.logger.verbose("Use version: {0}", self.pkg.version)
# We reset the logger here incase the handler version changes
if not requested_version.matches(FlexibleVersion(self.ext_handler.version)):
self.set_logger(extension=extension)
return self.pkg
def set_logger(self, execution_log_max_size=(10 * 1024 * 1024), extension=None):
prefix = "[{0}]".format(self.get_full_name(extension))
self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix)
self.__set_command_execution_log(extension, execution_log_max_size)
def version_gt(self, other):
self_version = self.ext_handler.version
other_version = other.ext_handler.version
return FlexibleVersion(self_version) > FlexibleVersion(other_version)
def version_ne(self, other):
self_version = self.ext_handler.version
other_version = other.ext_handler.version
return FlexibleVersion(self_version) != FlexibleVersion(other_version)
def get_installed_ext_handler(self):
latest_version = self.get_installed_version()
if latest_version is None:
return None
installed_handler = copy.deepcopy(self.ext_handler)
installed_handler.version = latest_version
return ExtHandlerInstance(installed_handler, self.protocol)
def get_installed_version(self):
latest_version = None
for path in glob.iglob(os.path.join(conf.get_lib_dir(), self.ext_handler.name + "-*")):
if not os.path.isdir(path):
continue
separator = path.rfind('-')
version_from_path = FlexibleVersion(path[separator + 1:])
state_path = os.path.join(path, 'config', 'HandlerState')
if not os.path.exists(state_path) or fileutil.read_file(state_path) == ExtHandlerState.NotInstalled \
or fileutil.read_file(state_path) == ExtHandlerState.FailedUpgrade:
logger.verbose("Ignoring version of uninstalled or failed extension: {0}".format(path))
continue
if latest_version is None or latest_version < version_from_path:
latest_version = version_from_path
return str(latest_version) if latest_version is not None else None
def copy_status_files(self, old_ext_handler_i):
self.logger.info("Copy status files from old plugin to new")
old_ext_dir = old_ext_handler_i.get_base_dir()
new_ext_dir = self.get_base_dir()
old_ext_mrseq_file = os.path.join(old_ext_dir, "mrseq")
if os.path.isfile(old_ext_mrseq_file):
logger.info("Migrating {0} to {1}.", old_ext_mrseq_file, new_ext_dir)
shutil.copy2(old_ext_mrseq_file, new_ext_dir)
else:
logger.info("{0} does not exist, no migration is needed.", old_ext_mrseq_file)
old_ext_status_dir = old_ext_handler_i.get_status_dir()
new_ext_status_dir = self.get_status_dir()
if os.path.isdir(old_ext_status_dir):
for status_file in os.listdir(old_ext_status_dir):
status_file = os.path.join(old_ext_status_dir, status_file)
if os.path.isfile(status_file):
shutil.copy2(status_file, new_ext_status_dir)
def set_operation(self, op):
self.operation = op
def report_event(self, name=None, message="", is_success=True, duration=0, log_event=True):
ext_handler_version = self.ext_handler.version
name = self.ext_handler.name if name is None else name
add_event(name=name, version=ext_handler_version, message=message,
op=self.operation, is_success=is_success, duration=duration, log_event=log_event)
def _unzip_extension_package(self, source_file, target_directory):
self.logger.info("Unzipping extension package: {0}", source_file)
try:
zipfile.ZipFile(source_file).extractall(target_directory)
except Exception as exception:
logger.info("Error while unzipping extension package: {0}", ustr(exception))
os.remove(source_file)
if os.path.exists(target_directory):
shutil.rmtree(target_directory)
return False
return True
def download(self):
begin_utc = datetime.datetime.utcnow()
self.set_operation(WALAEventOperation.Download)
if self.pkg is None or self.pkg.uris is None or len(self.pkg.uris) == 0:
raise ExtensionDownloadError("No package uri found")
package_file = os.path.join(conf.get_lib_dir(), self.get_extension_package_zipfile_name())
package_exists = False
if os.path.exists(package_file):
self.logger.info("Using existing extension package: {0}", package_file)
if self._unzip_extension_package(package_file, self.get_base_dir()):
package_exists = True
else:
self.logger.info("The existing extension package is invalid, will ignore it.")
if not package_exists:
is_fast_track_goal_state = self.protocol.get_goal_state().extensions_goal_state.source == GoalStateSource.FastTrack
self.protocol.client.download_zip_package("extension package", self.pkg.uris, package_file, self.get_base_dir(), use_verify_header=is_fast_track_goal_state)
self.report_event(message="Download succeeded", duration=elapsed_milliseconds(begin_utc))
self.pkg_file = package_file
def ensure_consistent_data_for_mc(self):
# If CRP expects Handler to support MC, ensure the HandlerManifest also reflects that.
# Even though the HandlerManifest.json is not expected to change once the extension is installed,
# CRP can wrongfully request send a Multi-Config GoalState even if the Handler supports only Single Config.
# Checking this only if HandlerState == Enable. In case of Uninstall, we dont care.
if self.supports_multi_config and not self.load_manifest().supports_multiple_extensions():
raise ExtensionsGoalStateError(
"Handler {0} does not support MultiConfig but CRP expects it, failing due to inconsistent data".format(
self.ext_handler.name))
def initialize(self):
self.logger.info("Initializing extension {0}".format(self.get_full_name()))
# Add user execute permission to all files under the base dir
for file in fileutil.get_all_files(self.get_base_dir()): # pylint: disable=redefined-builtin
fileutil.chmod(file, os.stat(file).st_mode | stat.S_IXUSR)
# Save HandlerManifest.json
man_file = fileutil.search_file(self.get_base_dir(), 'HandlerManifest.json')
if man_file is None:
raise ExtensionDownloadError("HandlerManifest.json not found")
try:
man = fileutil.read_file(man_file, remove_bom=True)
fileutil.write_file(self.get_manifest_file(), man)
except IOError as e:
fileutil.clean_ioerror(e, paths=[self.get_base_dir(), self.pkg_file])
raise ExtensionDownloadError(u"Failed to save HandlerManifest.json", e)
man = self.load_manifest()
man.report_invalid_boolean_properties(ext_name=self.get_full_name())
self.ensure_consistent_data_for_mc()
# Create status and config dir
try:
status_dir = self.get_status_dir()
fileutil.mkdir(status_dir, mode=0o700)
conf_dir = self.get_conf_dir()
fileutil.mkdir(conf_dir, mode=0o700)
if get_supported_feature_by_name(SupportedFeatureNames.ExtensionTelemetryPipeline).is_supported:
fileutil.mkdir(self.get_extension_events_dir(), mode=0o700)
except IOError as e:
fileutil.clean_ioerror(e, paths=[self.get_base_dir(), self.pkg_file])
raise ExtensionDownloadError(u"Failed to initialize extension '{0}'".format(self.get_full_name()), e)
# Save HandlerEnvironment.json
self.create_handler_env()
self.set_extension_resource_limits()
def set_extension_resource_limits(self):
extension_name = self.get_full_name()
# setup the resource limits for extension operations and it's services.
man = self.load_manifest()
resource_limits = man.get_resource_limits()
CGroupConfigurator.get_instance().setup_extension_slice(
extension_name=extension_name, cpu_quota=resource_limits.get_extension_slice_cpu_quota())
CGroupConfigurator.get_instance().set_extension_services_cpu_memory_quota(resource_limits.get_service_list())
def create_status_file_if_not_exist(self, extension, status, code, operation, message):
_, status_path = self.get_status_file_path(extension)
if status_path is not None and not os.path.exists(status_path):
now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
status_contents = [
{
"version": 1.0,
"timestampUTC": now,
"status": {
"name": self.get_extension_full_name(extension),
"operation": operation,
"status": status,
"code": code,
"formattedMessage": {
"lang": "en-US",
"message": message
}
}
}
]
# Create status directory if not exists. This is needed in the case where the Handler fails before even
# initializing the directories (ExtensionsGoalStateError, Version deleted from PIR error, etc)
if not os.path.exists(os.path.dirname(status_path)):
fileutil.mkdir(os.path.dirname(status_path), mode=0o700)
self.logger.info("Creating a placeholder status file {0} with status: {1}".format(status_path, status))
fileutil.write_file(status_path, json.dumps(status_contents))
def enable(self, extension=None, uninstall_exit_code=None):
try:
self._enable_extension(extension, uninstall_exit_code)
except ExtensionError as error:
if self.should_perform_multi_config_op(extension):
raise MultiConfigExtensionEnableError(error)
raise
# Even if a single extension is enabled for this handler, set the Handler state as Enabled
self.set_handler_state(ExtHandlerState.Enabled)
self.set_handler_status(status=ExtHandlerStatusValue.ready, message="Plugin enabled")
def should_perform_multi_config_op(self, extension):
return self.supports_multi_config and extension is not None
def _enable_extension(self, extension, uninstall_exit_code):
uninstall_exit_code = str(uninstall_exit_code) if uninstall_exit_code is not None else NOT_RUN
env = {
ExtCommandEnvVariable.UninstallReturnCode: uninstall_exit_code
}
# This check to call the setup if extension already installed and not called setup before
self.set_extension_resource_limits()
self.set_operation(WALAEventOperation.Enable)
man = self.load_manifest()
enable_cmd = man.get_enable_command()
self.logger.info("Enable extension: [{0}]".format(enable_cmd))
self.launch_command(enable_cmd, cmd_name="enable", timeout=300,
extension_error_code=ExtensionErrorCodes.PluginEnableProcessingFailed, env=env,
extension=extension)
if self.should_perform_multi_config_op(extension):
# Only save extension state if MC supported
self.__set_extension_state(extension, ExtensionState.Enabled)
# start tracking the extension services cgroup.
resource_limits = man.get_resource_limits()
CGroupConfigurator.get_instance().start_tracking_extension_services_cgroups(
resource_limits.get_service_list())
def _disable_extension(self, extension=None):
self.set_operation(WALAEventOperation.Disable)
man = self.load_manifest()
disable_cmd = man.get_disable_command()
self.logger.info("Disable extension: [{0}]".format(disable_cmd))
self.launch_command(disable_cmd, cmd_name="disable", timeout=900,
extension_error_code=ExtensionErrorCodes.PluginDisableProcessingFailed,
extension=extension)
def disable(self, extension=None, ignore_error=False):
try:
self._disable_extension(extension)
except ExtensionError as error:
if not ignore_error:
raise
msg = "[Ignored Error] Ran into error disabling extension:{0}".format(ustr(error))
self.logger.info(msg)
self.report_event(name=self.get_extension_full_name(extension), message=msg, is_success=False,
log_event=False)
#
# In the case of multi-config handlers, we keep the state of each extension individually.
# Disable can be called when the extension is deleted (the extension state in the goal state is set to "disabled"),
# or as part of the Uninstall and Update sequences. When the extension is deleted, we need to remove its state, along
# with its status and settings files. Otherwise, we need to set the state to "disabled".
#
if self.should_perform_multi_config_op(extension):
if extension.state == ExtensionRequestedState.Disabled:
self.__remove_extension_state_files(extension)
else:
self.__set_extension_state(extension, ExtensionState.Disabled)
# For Single config, dont check enabled_extensions because no extension state is maintained.
# For MultiConfig, Set the handler state to Installed only when all extensions have been disabled
if not self.supports_multi_config or not any(self.enabled_extensions):
self.set_handler_state(ExtHandlerState.Installed)
self.set_handler_status(status=ExtHandlerStatusValue.not_ready, message="Plugin disabled")
def install(self, uninstall_exit_code=None, extension=None):
# For Handler level operations, extension just specifies the settings that initiated the install.
# This is needed to provide the sequence number and extension name in case the extension needs to report
# failure/status using status file.
uninstall_exit_code = str(uninstall_exit_code) if uninstall_exit_code is not None else NOT_RUN
env = {ExtCommandEnvVariable.UninstallReturnCode: uninstall_exit_code}
man = self.load_manifest()
install_cmd = man.get_install_command()
self.logger.info("Install extension [{0}]".format(install_cmd))
self.set_operation(WALAEventOperation.Install)
self.launch_command(install_cmd, cmd_name="install", timeout=900, extension=extension,
extension_error_code=ExtensionErrorCodes.PluginInstallProcessingFailed, env=env)
self.set_handler_state(ExtHandlerState.Installed)
self.set_handler_status(status=ExtHandlerStatusValue.not_ready, message="Plugin installed but not enabled")
def uninstall(self, extension=None):
# For Handler level operations, extension just specifies the settings that initiated the uninstall.
# This is needed to provide the sequence number and extension name in case the extension needs to report
# failure/status using status file.
self.set_operation(WALAEventOperation.UnInstall)
man = self.load_manifest()
# stop tracking extension services cgroup.
resource_limits = man.get_resource_limits()
CGroupConfigurator.get_instance().stop_tracking_extension_services_cgroups(
resource_limits.get_service_list())
CGroupConfigurator.get_instance().reset_extension_services_quota(
resource_limits.get_service_list())
uninstall_cmd = man.get_uninstall_command()
self.logger.info("Uninstall extension [{0}]".format(uninstall_cmd))
self.launch_command(uninstall_cmd, cmd_name="uninstall", extension=extension)
def remove_ext_handler(self):
try:
zip_filename = os.path.join(conf.get_lib_dir(), self.get_extension_package_zipfile_name())
if os.path.exists(zip_filename):
os.remove(zip_filename)
self.logger.verbose("Deleted the extension zip at path {0}", zip_filename)
base_dir = self.get_base_dir()
if os.path.isdir(base_dir):
self.logger.info("Remove extension handler directory: {0}", base_dir)
# some extensions uninstall asynchronously so ignore error 2 while removing them
def on_rmtree_error(_, __, exc_info):
_, exception, _ = exc_info
if not isinstance(exception, OSError) or exception.errno != 2: # [Errno 2] No such file or directory
raise exception
shutil.rmtree(base_dir, onerror=on_rmtree_error)
CGroupConfigurator.get_instance().stop_tracking_extension_cgroups(self.get_full_name())
self.logger.info("Remove the extension slice: {0}".format(self.get_full_name()))
CGroupConfigurator.get_instance().reset_extension_quota(
extension_name=self.get_full_name())
except IOError as e:
message = "Failed to remove extension handler directory: {0}".format(e)
self.report_event(message=message, is_success=False)
self.logger.warn(message)
def update(self, handler_version=None, disable_exit_codes=None, updating_from_version=None, extension=None):
# For Handler level operations, extension just specifies the settings that initiated the update.
# This is needed to provide the sequence number and extension name in case the extension needs to report
# failure/status using status file.
if handler_version is None:
handler_version = self.ext_handler.version
env = {
'VERSION': handler_version,
ExtCommandEnvVariable.UpdatingFromVersion: updating_from_version
}
if not self.supports_multi_config:
# For single config, extension.name == ext_handler.name
env[ExtCommandEnvVariable.DisableReturnCode] = ustr(disable_exit_codes.get(self.ext_handler.name))
else:
disable_codes = []
for ext in self.extensions:
disable_codes.append({
"extensionName": ext.name,
"exitCode": ustr(disable_exit_codes.get(ext.name))
})
env[ExtCommandEnvVariable.DisableReturnCodeMultipleExtensions] = json.dumps(disable_codes)
try:
self.set_operation(WALAEventOperation.Update)
man = self.load_manifest()
update_cmd = man.get_update_command()
self.logger.info("Update extension [{0}]".format(update_cmd))
self.launch_command(update_cmd, cmd_name="update",
timeout=900,
extension_error_code=ExtensionErrorCodes.PluginUpdateProcessingFailed,
env=env, extension=extension)
except ExtensionError:
# Mark the handler as Failed so we don't clean it up and can keep reporting its status
self.set_handler_state(ExtHandlerState.FailedUpgrade)
raise
def update_with_install(self, uninstall_exit_code=None, extension=None):
man = self.load_manifest()
if man.is_update_with_install():
self.install(uninstall_exit_code=uninstall_exit_code, extension=extension)
else:
self.logger.info("UpdateWithInstall not set. "
"Skip install during upgrade.")
self.set_handler_state(ExtHandlerState.Installed)
def _get_last_modified_seq_no_from_config_files(self, extension):
"""
The sequence number is not guaranteed to always be strictly increasing. To ensure we always get the latest one,
fetching the sequence number from config file that was last modified (and not necessarily the largest).
:return: Last modified Sequence number or -1 on errors
"""
seq_no = -1
if self.supports_multi_config and (extension is None or extension.name is None):
# If no extension name is provided for Multi Config, don't try to parse any sequence number from filesystem
return seq_no
try:
largest_modified_time = 0
conf_dir = self.get_conf_dir()
for item in os.listdir(conf_dir):
item_path = os.path.join(conf_dir, item)
if not os.path.isfile(item_path):
continue
try:
# Settings file for Multi Config look like - <extName>.<seqNo>.settings
# Settings file for Single Config look like - <seqNo>.settings
match = re.search("((?P<ext_name>\\w+)\\.)*(?P<seq_no>\\d+)\\.settings", item_path)
if match is not None:
ext_name = match.group('ext_name')
if self.supports_multi_config and extension.name != ext_name:
continue
curr_seq_no = int(match.group("seq_no"))
curr_modified_time = os.path.getmtime(item_path)
if curr_modified_time > largest_modified_time:
seq_no = curr_seq_no
largest_modified_time = curr_modified_time
except (ValueError, IndexError, TypeError):
self.logger.verbose("Failed to parse file name: {0}", item)
continue
except Exception as error:
logger.verbose("Error fetching sequence number from config files: {0}".format(ustr(error)))
seq_no = -1
return seq_no
def get_status_file_path(self, extension=None):
"""
We should technically only fetch the sequence number from GoalState and not rely on the filesystem at all,
But there are certain scenarios where we need to fetch the latest sequence number from the filesystem
(For example when we need to report the status for extensions of previous GS if the current GS is Unsupported).
Always prioritizing sequence number from extensions but falling back to filesystem
:param extension: Extension for which the sequence number is required
:return: Sequence number for the extension, Status file path or -1, None
"""
path = None
seq_no = None
if extension is not None and extension.sequenceNumber is not None:
try:
seq_no = int(extension.sequenceNumber)
except ValueError:
logger.error('Sequence number [{0}] does not appear to be valid'.format(extension.sequenceNumber))
if seq_no is None:
# If we're unable to fetch Sequence number from Extension for any reason,
# try fetching it from the last modified Settings file.
seq_no = self._get_last_modified_seq_no_from_config_files(extension)
if seq_no is not None and seq_no > -1:
if self.should_perform_multi_config_op(extension) and extension is not None and extension.name is not None:
path = os.path.join(self.get_status_dir(), "{0}.{1}.status".format(extension.name, seq_no))
elif not self.supports_multi_config:
path = os.path.join(self.get_status_dir(), "{0}.status").format(seq_no)
return seq_no if seq_no is not None else -1, path
def collect_ext_status(self, ext):
self.logger.verbose("Collect extension status for {0}".format(self.get_extension_full_name(ext)))
seq_no, ext_status_file = self.get_status_file_path(ext)
# We should never try to read any status file if the handler has no settings, returning None in that case
if seq_no == -1 or ext is None:
return None
data = None
data_str = None
# Extension.name contains the extension name in case of MC and Handler name in case of Single Config.
ext_status = ExtensionStatus(name=ext.name, seq_no=seq_no)
try:
data_str, data = self._read_status_file(ext_status_file)
except ExtensionStatusError as e:
msg = ""
ext_status.status = ExtensionStatusValue.error
if e.code == ExtensionStatusError.CouldNotReadStatusFile:
ext_status.code = ExtensionErrorCodes.PluginUnknownFailure
msg = u"We couldn't read any status for {0} extension, for the sequence number {1}. It failed due" \
u" to {2}".format(self.get_full_name(ext), seq_no, ustr(e))
elif e.code == ExtensionStatusError.InvalidJsonFile:
ext_status.code = ExtensionErrorCodes.PluginSettingsStatusInvalid
msg = u"The status reported by the extension {0}(Sequence number {1}), was in an " \
u"incorrect format and the agent could not parse it correctly. Failed due to {2}" \
.format(self.get_full_name(ext), seq_no, ustr(e))
elif e.code == ExtensionStatusError.FileNotExists:
msg = "This status is being reported by the Guest Agent since no status file was " \
"reported by extension {0}: {1}".format(self.get_extension_full_name(ext), ustr(e))
# Reporting a success code and transitioning status to keep in accordance with existing code that
# creates default status placeholder file
ext_status.code = ExtensionErrorCodes.PluginSuccess
ext_status.status = ExtensionStatusValue.transitioning
# This log is periodic due to the verbose nature of the status check. Please make sure that the message
# constructed above does not change very frequently and includes important info such as sequence number,
# extension name to make sure that the log reflects changes in the extension sequence for which the
# status is being sent.
logger.periodic_warn(logger.EVERY_HALF_HOUR, u"[PERIODIC] " + msg)
add_periodic(delta=logger.EVERY_HALF_HOUR, name=self.get_extension_full_name(ext),
version=self.ext_handler.version,
op=WALAEventOperation.StatusProcessing, is_success=False, message=msg,
log_event=False)
ext_status.message = msg
return ext_status
# We did not encounter InvalidJsonFile/CouldNotReadStatusFile and thus the status file was correctly written
# and has valid json.
try:
parse_ext_status(ext_status, data)
if len(data_str) > _MAX_STATUS_FILE_SIZE_IN_BYTES:
raise ExtensionStatusError(msg="For Extension Handler {0} for the sequence number {1}, the status "
"file {2} of size {3} bytes is too big. Max Limit allowed is {4} bytes"
.format(self.get_full_name(ext), seq_no,
ext_status_file, len(data_str), _MAX_STATUS_FILE_SIZE_IN_BYTES),
code=ExtensionStatusError.MaxSizeExceeded)
except ExtensionStatusError as e:
msg = u"For Extension Handler {0} for the sequence number {1}, the status file {2}. " \
u"Encountered the following error: {3}".format(self.get_full_name(ext), seq_no,
ext_status_file, ustr(e))
logger.periodic_warn(logger.EVERY_DAY, u"[PERIODIC] " + msg)
add_periodic(delta=logger.EVERY_HALF_HOUR, name=self.get_extension_full_name(ext),
version=self.ext_handler.version,
op=WALAEventOperation.StatusProcessing, is_success=False, message=msg, log_event=False)
if e.code == ExtensionStatusError.MaxSizeExceeded:
ext_status.message, field_size = self._truncate_message(ext_status.message, _MAX_STATUS_MESSAGE_LENGTH)
ext_status.substatusList = self._process_substatus_list(ext_status.substatusList, field_size)
elif e.code == ExtensionStatusError.StatusFileMalformed:
ext_status.message = "Could not get a valid status from the extension {0}. Encountered the " \
"following error: {1}".format(self.get_full_name(ext), ustr(e))
ext_status.code = ExtensionErrorCodes.PluginSettingsStatusInvalid
ext_status.status = ExtensionStatusValue.error
return ext_status
def get_ext_handling_status(self, ext):
seq_no, ext_status_file = self.get_status_file_path(ext)
# This is legacy scenario for cases when no extension settings is available
if seq_no < 0 or ext_status_file is None:
return None
# Missing status file is considered a non-terminal state here
# so that extension sequencing can wait until it becomes existing
if not os.path.exists(ext_status_file):
status = ExtensionStatusValue.warning
else:
ext_status = self.collect_ext_status(ext)
status = ext_status.status if ext_status is not None else None
return status
def is_ext_handling_complete(self, ext):
status = self.get_ext_handling_status(ext)
# when seq < 0 (i.e. no new user settings), the handling is complete and return None status
if status is None:
return True, None
# If not in terminal state, it is incomplete
if status not in _EXTENSION_TERMINAL_STATUSES:
return False, status
# Extension completed, return its status
return True, status
def report_error_on_incarnation_change(self, goal_state_changed, log_msg, event_msg, extension=None,
op=WALAEventOperation.ReportStatus):
# Since this code is called on a loop, logging as a warning only on goal state change, else logging it
# as verbose
if goal_state_changed:
logger.warn(log_msg)
add_event(name=self.get_extension_full_name(extension), version=self.ext_handler.version,
op=op, message=event_msg, is_success=False, log_event=False)
else:
logger.verbose(log_msg)
def get_extension_handler_statuses(self, handler_status, goal_state_changed):
"""
Get the list of ExtHandlerStatus objects corresponding to each extension in the Handler. Each object might have
its own status for the Extension status but the Handler status would be the same for each extension in a Handle
:return: List of ExtHandlerStatus objects for each extension in the Handler
"""
ext_handler_statuses = []
# TODO Refactor or remove this common code pattern (for each extension subordinate to an ext_handler, do X).
for ext in self.extensions:
# In MC, for disabled extensions we dont need to report status. Skip reporting if disabled and state == disabled
# Extension.state corresponds to the state requested by CRP, self.__get_extension_state() corresponds to the
# state of the extension on the VM. Skip reporting only if both are Disabled
if self.should_perform_multi_config_op(ext) and \
ext.state == ExtensionState.Disabled and self.get_extension_state(ext) == ExtensionState.Disabled:
continue
# Breaking off extension reporting in 2 parts, one which is Handler dependent and the other that is Extension dependent
try:
ext_handler_status = ExtHandlerStatus()
set_properties("ExtHandlerStatus", ext_handler_status, get_properties(handler_status))
except Exception as error:
msg = "Something went wrong when trying to get a copy of the Handler status for {0}".format(
self.get_extension_full_name())
self.report_error_on_incarnation_change(goal_state_changed, event_msg=msg,
log_msg="{0}.\nStack Trace: {1}".format(
msg, textutil.format_exception(error)))
# Since this is a Handler level error and we need to do it per extension, breaking here and logging
# error since we wont be able to report error anyways and saving it as a handler status (legacy behavior)
self.set_handler_status(message=msg, code=-1)
break
# For the extension dependent stuff, if there's some unhandled error, we will report it back to CRP as an extension error.
try:
ext_status = self.collect_ext_status(ext)
if ext_status is not None:
ext_handler_status.extension_status = ext_status
ext_handler_statuses.append(ext_handler_status)
except ExtensionError as error:
msg = "Unknown error when trying to fetch status from extension {0}".format(
self.get_extension_full_name(ext))
self.report_error_on_incarnation_change(goal_state_changed, event_msg=msg,
log_msg="{0}.\nStack Trace: {1}".format(
msg, textutil.format_exception(error)),
extension=ext)
# Unexpected error, for single config, keep the behavior as is
if not self.should_perform_multi_config_op(ext):
self.set_handler_status(message=ustr(error), code=error.code)
break
# For MultiConfig, create a custom ExtensionStatus object with the error details and attach it to the Handler.
# This way the error would be reported back to CRP and the failure would be propagated instantly as compared to CRP eventually timing it out.
ext_status = ExtensionStatus(name=ext.name, seq_no=ext.sequenceNumber,
code=ExtensionErrorCodes.PluginUnknownFailure,
status=ExtensionStatusValue.error, message=msg)
ext_handler_status.extension_status = ext_status
ext_handler_statuses.append(ext_handler_status)
return ext_handler_statuses
def collect_heartbeat(self): # pylint: disable=R1710
man = self.load_manifest()
if not man.is_report_heartbeat():
return
heartbeat_file = os.path.join(conf.get_lib_dir(),
self.get_heartbeat_file())
if not os.path.isfile(heartbeat_file):
raise ExtensionError("Failed to get heart beat file")
if not self.is_responsive(heartbeat_file):
return {
"status": "Unresponsive",
"code": -1,
"message": "Extension heartbeat is not responsive"
}
try:
heartbeat_json = fileutil.read_file(heartbeat_file)
heartbeat = json.loads(heartbeat_json)[0]['heartbeat']
except IOError as e:
raise ExtensionError("Failed to get heartbeat file:{0}".format(e))
except (ValueError, KeyError) as e:
raise ExtensionError("Malformed heartbeat file: {0}".format(e))
return heartbeat
@staticmethod
def is_responsive(heartbeat_file):
"""
Was heartbeat_file updated within the last ten (10) minutes?
:param heartbeat_file: str
:return: bool
"""
last_update = int(time.time() - os.stat(heartbeat_file).st_mtime)
return last_update <= 600
def launch_command(self, cmd, cmd_name=None, timeout=300, extension_error_code=ExtensionErrorCodes.PluginProcessingError,
env=None, extension=None):
begin_utc = datetime.datetime.utcnow()
self.logger.verbose("Launch command: [{0}]", cmd)
base_dir = self.get_base_dir()
with tempfile.TemporaryFile(dir=base_dir, mode="w+b") as stdout:
with tempfile.TemporaryFile(dir=base_dir, mode="w+b") as stderr:
if env is None:
env = {}
# Always add Extension Path and version to the current launch_command (Ask from publishers)
env.update({
ExtCommandEnvVariable.ExtensionPath: base_dir,
ExtCommandEnvVariable.ExtensionVersion: str(self.ext_handler.version),
ExtCommandEnvVariable.WireProtocolAddress: self.protocol.get_endpoint(),
# Setting sequence number to 0 incase no settings provided to keep in accordance with the empty
# 0.settings file that we create for such extensions.
ExtCommandEnvVariable.ExtensionSeqNumber: str(
extension.sequenceNumber) if extension is not None else _DEFAULT_SEQ_NO
})
if self.should_perform_multi_config_op(extension):
env[ExtCommandEnvVariable.ExtensionName] = extension.name
supported_features = []
for _, feature in get_agent_supported_features_list_for_extensions().items():
supported_features.append(
{
"Key": feature.name,
"Value": feature.version
}
)
if supported_features:
env[ExtCommandEnvVariable.ExtensionSupportedFeatures] = json.dumps(supported_features)
ext_name = self.get_extension_full_name(extension)
try:
# Some extensions erroneously begin cmd with a slash; don't interpret those
# as root-relative. (Issue #1170)
command_full_path = os.path.join(base_dir, cmd.lstrip(os.path.sep))
log_msg = "Executing command: {0} with environment variables: {1}".format(command_full_path,
json.dumps(env))
self.logger.info(log_msg)
self.report_event(name=ext_name, message=log_msg, log_event=False)
# Add the os environment variables before executing command
env.update(os.environ)
process_output = CGroupConfigurator.get_instance().start_extension_command(
extension_name=self.get_full_name(extension),
command=command_full_path,
cmd_name=cmd_name,
timeout=timeout,
shell=True,
cwd=base_dir,
env=env,
stdout=stdout,
stderr=stderr,
error_code=extension_error_code)
except OSError as e:
raise ExtensionError("Failed to launch '{0}': {1}".format(command_full_path, e.strerror),
code=extension_error_code)
duration = elapsed_milliseconds(begin_utc)
log_msg = "Command: {0}\n{1}".format(cmd, "\n".join(
[line for line in process_output.split('\n') if line != ""]))
self.logger.info(log_msg)
self.report_event(name=ext_name, message=log_msg, duration=duration, log_event=False)
return process_output
def load_manifest(self):
man_file = self.get_manifest_file()
try:
data = json.loads(fileutil.read_file(man_file))
except (IOError, OSError) as e:
raise ExtensionError('Failed to load manifest file ({0}): {1}'.format(man_file, e.strerror),
code=ExtensionErrorCodes.PluginHandlerManifestNotFound)
except ValueError:
raise ExtensionError('Malformed manifest file ({0}).'.format(man_file),
code=ExtensionErrorCodes.PluginHandlerManifestDeserializationError)
return HandlerManifest(data[0])
def update_settings_file(self, settings_file, settings):
settings_file = os.path.join(self.get_conf_dir(), settings_file)
try:
fileutil.write_file(settings_file, settings)
except IOError as e:
fileutil.clean_ioerror(e,
paths=[settings_file])
raise ExtensionError(u"Failed to update settings file", e)
def update_settings(self, extension):
if self.extensions is None or len(self.extensions) == 0 or extension is None:
# This is the behavior of waagent 2.0.x
# The new agent has to be consistent with the old one.
self.logger.info("Extension has no settings, write empty 0.settings")
self.update_settings_file("{0}.settings".format(_DEFAULT_SEQ_NO), "")
return
settings = {
'publicSettings': extension.publicSettings,
'protectedSettings': extension.protectedSettings,
'protectedSettingsCertThumbprint': extension.certificateThumbprint
}
ext_settings = {
"runtimeSettings": [{
"handlerSettings": settings
}]
}
# MultiConfig: change the name to <extName>.<seqNo>.settings for MC and <seqNo>.settings for SC
settings_file = "{0}.{1}.settings".format(extension.name, extension.sequenceNumber) if \
self.should_perform_multi_config_op(extension) else "{0}.settings".format(extension.sequenceNumber)
self.logger.info("Update settings file: {0}", settings_file)
self.update_settings_file(settings_file, json.dumps(ext_settings))
def create_handler_env(self):
handler_env = {
HandlerEnvironment.logFolder: self.get_log_dir(),
HandlerEnvironment.configFolder: self.get_conf_dir(),
HandlerEnvironment.statusFolder: self.get_status_dir(),
HandlerEnvironment.heartbeatFile: self.get_heartbeat_file()
}
if get_supported_feature_by_name(SupportedFeatureNames.ExtensionTelemetryPipeline).is_supported:
handler_env[HandlerEnvironment.eventsFolder] = self.get_extension_events_dir()
# For now, keep the preview key to not break extensions that were using the preview.
handler_env[HandlerEnvironment.eventsFolder_preview] = self.get_extension_events_dir()
env = [{
HandlerEnvironment.name: self.ext_handler.name,
HandlerEnvironment.version: HandlerEnvironment.schemaVersion,
HandlerEnvironment.handlerEnvironment: handler_env
}]
try:
fileutil.write_file(self.get_env_file(), json.dumps(env))
except IOError as e:
fileutil.clean_ioerror(e,
paths=[self.get_base_dir(), self.pkg_file])
raise ExtensionDownloadError(u"Failed to save handler environment", e)
def __get_handler_state_file_name(self, extension=None):
if self.should_perform_multi_config_op(extension):
return "{0}.HandlerState".format(extension.name)
return "HandlerState"
def set_handler_state(self, handler_state):
self.__set_state(name=self.__get_handler_state_file_name(), value=handler_state)
def get_handler_state(self):
return self.__get_state(name=self.__get_handler_state_file_name(), default=ExtHandlerState.NotInstalled)
def __set_extension_state(self, extension, extension_state):
self.__set_state(name=self.__get_handler_state_file_name(extension), value=extension_state)
def get_extension_state(self, extension=None):
return self.__get_state(name=self.__get_handler_state_file_name(extension), default=ExtensionState.Disabled)
def __set_state(self, name, value):
state_dir = self.get_conf_dir()
state_file = os.path.join(state_dir, name)
try:
if not os.path.exists(state_dir):
fileutil.mkdir(state_dir, mode=0o700)
fileutil.write_file(state_file, value)
except IOError as e:
fileutil.clean_ioerror(e, paths=[state_file])
self.logger.error("Failed to set state: {0}", e)
def __get_state(self, name, default=None):
state_dir = self.get_conf_dir()
state_file = os.path.join(state_dir, name)
if not os.path.isfile(state_file):
return default
try:
return fileutil.read_file(state_file)
except IOError as e:
self.logger.error("Failed to get state: {0}", e)
return default
def __remove_extension_state_files(self, extension):
self.logger.info("Removing states files for disabled extension: {0}".format(extension.name))
try:
# MultiConfig: Remove all config/<extName>.*.settings, status/<extName>.*.status and config/<extName>.HandlerState files
files_to_delete = [
os.path.join(self.get_conf_dir(), "{0}.*.settings".format(extension.name)),
os.path.join(self.get_status_dir(), "{0}.*.status".format(extension.name)),
os.path.join(self.get_conf_dir(), self.__get_handler_state_file_name(extension))
]
fileutil.rm_files(*files_to_delete)
except Exception as error:
extension_name = self.get_extension_full_name(extension)
message = "Failed to remove extension state files for {0}: {1}".format(extension_name, ustr(error))
self.report_event(name=extension_name, message=message, is_success=False, log_event=False)
self.logger.warn(message)
def set_handler_status(self, status=ExtHandlerStatusValue.not_ready, message="", code=0):
state_dir = self.get_conf_dir()
handler_status = ExtHandlerStatus()
handler_status.name = self.ext_handler.name
handler_status.version = str(self.ext_handler.version)
handler_status.message = message
handler_status.code = code
handler_status.status = status
handler_status.supports_multi_config = self.ext_handler.supports_multi_config
status_file = os.path.join(state_dir, "HandlerStatus")
try:
handler_status_json = json.dumps(get_properties(handler_status))
if handler_status_json is not None:
if not os.path.exists(state_dir):
fileutil.mkdir(state_dir, mode=0o700)
fileutil.write_file(status_file, handler_status_json)
else:
self.logger.error("Failed to create JSON document of handler status for {0} version {1}".format(
self.ext_handler.name, self.ext_handler.version))
except (IOError, ValueError, ProtocolError) as error:
fileutil.clean_ioerror(error, paths=[status_file])
self.logger.error("Failed to save handler status: {0}", textutil.format_exception(error))
def get_handler_status(self):
state_dir = self.get_conf_dir()
status_file = os.path.join(state_dir, "HandlerStatus")
if not os.path.isfile(status_file):
return None
handler_status_contents = ""
try:
handler_status_contents = fileutil.read_file(status_file)
data = json.loads(handler_status_contents)
handler_status = ExtHandlerStatus()
set_properties("ExtHandlerStatus", handler_status, data)
return handler_status
except (IOError, ValueError) as error:
self.logger.error("Failed to get handler status: {0}", error)
except Exception as error:
error_msg = "Failed to get handler status message: {0}.\n Contents of file: {1}".format(
ustr(error), handler_status_contents).replace('"', '\'')
add_periodic(
delta=logger.EVERY_HOUR,
name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.ExtensionProcessing,
is_success=False,
message=error_msg)
raise
return None
def get_extension_package_zipfile_name(self):
return "{0}__{1}{2}".format(self.ext_handler.name,
self.ext_handler.version,
HANDLER_PKG_EXT)
def get_full_name(self, extension=None):
"""
:return: <HandlerName>-<HandlerVersion> if extension is None or Handler does not support Multi Config,
else then return - <HandlerName>.<ExtensionName>-<HandlerVersion>
"""
return "{0}-{1}".format(self.get_extension_full_name(extension), self.ext_handler.version)
def get_base_dir(self):
return os.path.join(conf.get_lib_dir(), self.get_full_name())
def get_status_dir(self):
return os.path.join(self.get_base_dir(), "status")
def get_conf_dir(self):
return os.path.join(self.get_base_dir(), 'config')
def get_extension_events_dir(self):
return os.path.join(self.get_log_dir(), EVENTS_DIRECTORY)
def get_heartbeat_file(self):
return os.path.join(self.get_base_dir(), 'heartbeat.log')
def get_manifest_file(self):
return os.path.join(self.get_base_dir(), 'HandlerManifest.json')
def get_env_file(self):
return os.path.join(self.get_base_dir(), HandlerEnvironment.fileName)
def get_log_dir(self):
return os.path.join(conf.get_ext_log_dir(), self.ext_handler.name)
@staticmethod
def _read_status_file(ext_status_file):
err_count = 0
while True:
try:
return ExtHandlerInstance._read_and_parse_json_status_file(ext_status_file)
except Exception:
err_count += 1
if err_count >= _NUM_OF_STATUS_FILE_RETRIES:
raise
time.sleep(_STATUS_FILE_RETRY_DELAY)
@staticmethod
def _read_and_parse_json_status_file(ext_status_file):
if not os.path.exists(ext_status_file):
raise ExtensionStatusError(msg="Status file {0} does not exist".format(ext_status_file),
code=ExtensionStatusError.FileNotExists)
try:
data_str = fileutil.read_file(ext_status_file)
except IOError as e:
raise ExtensionStatusError(msg=ustr(e), inner=e,
code=ExtensionStatusError.CouldNotReadStatusFile)
try:
data = json.loads(data_str)
except (ValueError, TypeError) as e:
raise ExtensionStatusError(msg="{0} \n First 2000 Bytes of status file:\n {1}".format(ustr(e), ustr(data_str)[:2000]),
inner=e,
code=ExtensionStatusError.InvalidJsonFile)
return data_str, data
def _process_substatus_list(self, substatus_list, current_status_size=0):
processed_substatus = []
# Truncating the substatus to reduce the size, and preserve other fields of the text
for substatus in substatus_list:
substatus.name, field_size = self._truncate_message(substatus.name, _MAX_SUBSTATUS_FIELD_LENGTH)
current_status_size += field_size
substatus.message, field_size = self._truncate_message(substatus.message, _MAX_SUBSTATUS_FIELD_LENGTH)
current_status_size += field_size
if current_status_size <= _MAX_STATUS_FILE_SIZE_IN_BYTES:
processed_substatus.append(substatus)
else:
break
return processed_substatus
@staticmethod
def _truncate_message(field, truncate_size=_MAX_SUBSTATUS_FIELD_LENGTH): # pylint: disable=R1710
if field is None: # pylint: disable=R1705
return
else:
truncated_field = field if len(field) < truncate_size else field[:truncate_size] + _TRUNCATED_SUFFIX
return truncated_field, len(truncated_field)
class HandlerEnvironment(object):
# HandlerEnvironment.json schema version
schemaVersion = 1.0
fileName = "HandlerEnvironment.json"
handlerEnvironment = "handlerEnvironment"
logFolder = "logFolder"
configFolder = "configFolder"
statusFolder = "statusFolder"
heartbeatFile = "heartbeatFile"
eventsFolder_preview = "eventsFolder_preview"
eventsFolder = "eventsFolder"
name = "name"
version = "version"
class HandlerManifest(object):
def __init__(self, data):
if data is None or data['handlerManifest'] is None:
raise ExtensionError('Malformed manifest file.')
self.data = data
def get_name(self):
return self.data["name"]
def get_version(self):
return self.data["version"]
def get_install_command(self):
return self.data['handlerManifest']["installCommand"]
def get_uninstall_command(self):
return self.data['handlerManifest']["uninstallCommand"]
def get_update_command(self):
return self.data['handlerManifest']["updateCommand"]
def get_enable_command(self):
return self.data['handlerManifest']["enableCommand"]
def get_disable_command(self):
return self.data['handlerManifest']["disableCommand"]
def is_report_heartbeat(self):
value = self.data['handlerManifest'].get('reportHeartbeat', False)
return self._parse_boolean_value(value, default_val=False)
def is_update_with_install(self):
update_mode = self.data['handlerManifest'].get('updateMode')
if update_mode is None:
return True
return update_mode.lower() == "updatewithinstall"
def is_continue_on_update_failure(self):
value = self.data['handlerManifest'].get('continueOnUpdateFailure', False)
return self._parse_boolean_value(value, default_val=False)
def supports_multiple_extensions(self):
value = self.data['handlerManifest'].get('supportsMultipleExtensions', False)
return self._parse_boolean_value(value, default_val=False)
def get_resource_limits(self):
return ResourceLimits(self.data.get('resourceLimits', None))
def report_invalid_boolean_properties(self, ext_name):
"""
Check that the specified keys in the handler manifest has boolean values.
"""
for key in ['reportHeartbeat', 'continueOnUpdateFailure', 'supportsMultipleExtensions']:
value = self.data['handlerManifest'].get(key)
if value is not None and not isinstance(value, bool):
msg = "In the handler manifest: '{0}' has a non-boolean value [{1}] for boolean type. Please change it to a boolean value.".format(key, value)
logger.info(msg)
add_event(name=ext_name, message=msg, op=WALAEventOperation.ExtensionHandlerManifest, log_event=False)
@staticmethod
def _parse_boolean_value(value, default_val):
"""
Expects boolean value but
for backward compatibility, 'true' (case-insensitive) is accepted, and other values default to False
Note: Json module returns unicode on py2. In py3, unicode removed
ustr is a unicode object for Py2 and a str object for Py3.
"""
if not isinstance(value, bool):
return True if isinstance(value, ustr) and value.lower() == "true" else default_val
return value
class ResourceLimits(object):
def __init__(self, data):
self.data = data
def get_extension_slice_cpu_quota(self):
if self.data is not None:
return self.data.get('cpuQuotaPercentage', None)
return None
def get_extension_slice_memory_quota(self):
if self.data is not None:
return self.data.get('memoryQuotaInMB', None)
return None
def get_service_list(self):
if self.data is not None:
return self.data.get('services', None)
return None
class ExtensionStatusError(ExtensionError):
"""
When extension failed to provide a valid status file
"""
CouldNotReadStatusFile = 1
InvalidJsonFile = 2
StatusFileMalformed = 3
MaxSizeExceeded = 4
FileNotExists = 5
def __init__(self, msg=None, inner=None, code=-1): # pylint: disable=W0235
super(ExtensionStatusError, self).__init__(msg, inner, code)