azurelinuxagent/ga/cgroupconfigurator.py (666 lines of code) (raw):
# -*- encoding: utf-8 -*-
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import glob
import json
import os
import re
import subprocess
import threading
from azurelinuxagent.common import conf
from azurelinuxagent.common import logger
from azurelinuxagent.ga.cgroupcontroller import AGENT_NAME_TELEMETRY, MetricsCounter
from azurelinuxagent.ga.cgroupapi import SystemdRunError, EXTENSION_SLICE_PREFIX, CGroupUtil, SystemdCgroupApiv2, \
log_cgroup_info, log_cgroup_warning, create_cgroup_api, InvalidCgroupMountpointException
from azurelinuxagent.ga.cgroupstelemetry import CGroupsTelemetry
from azurelinuxagent.ga.cpucontroller import _CpuController
from azurelinuxagent.ga.memorycontroller import _MemoryController
from azurelinuxagent.common.exception import ExtensionErrorCodes, CGroupsException, AgentMemoryExceededException
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import systemd
from azurelinuxagent.common.version import get_distro
from azurelinuxagent.common.utils import shellutil, fileutil
from azurelinuxagent.ga.extensionprocessutil import handle_process_completion
from azurelinuxagent.common.event import add_event, WALAEventOperation
AZURE_SLICE = "azure.slice"
_AZURE_SLICE_CONTENTS = """
[Unit]
Description=Slice for Azure VM Agent and Extensions
DefaultDependencies=no
Before=slices.target
"""
_VMEXTENSIONS_SLICE = EXTENSION_SLICE_PREFIX + ".slice"
_AZURE_VMEXTENSIONS_SLICE = AZURE_SLICE + "/" + _VMEXTENSIONS_SLICE
_VMEXTENSIONS_SLICE_CONTENTS = """
[Unit]
Description=Slice for Azure VM Extensions
DefaultDependencies=no
Before=slices.target
[Slice]
CPUAccounting=yes
MemoryAccounting=yes
"""
_EXTENSION_SLICE_CONTENTS = """
[Unit]
Description=Slice for Azure VM extension {extension_name}
DefaultDependencies=no
Before=slices.target
[Slice]
CPUAccounting=yes
CPUQuota={cpu_quota}
MemoryAccounting=yes
"""
LOGCOLLECTOR_SLICE = "azure-walinuxagent-logcollector.slice"
# More info on resource limits properties in systemd here:
# https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/7/html/resource_management_guide/sec-modifying_control_groups
LOGCOLLECTOR_CPU_QUOTA_FOR_V1_AND_V2 = "5%"
LOGCOLLECTOR_MEMORY_THROTTLE_LIMIT_FOR_V2 = "170M"
LOGCOLLECTOR_MAX_THROTTLED_EVENTS_FOR_V2 = 10
LOGCOLLECTOR_ANON_MEMORY_LIMIT_FOR_V1_AND_V2 = 25 * 1024 ** 2 # 25Mb
LOGCOLLECTOR_CACHE_MEMORY_LIMIT_FOR_V1_AND_V2 = 155 * 1024 ** 2 # 155Mb
_AGENT_DROP_IN_FILE_SLICE = "10-Slice.conf"
_AGENT_DROP_IN_FILE_SLICE_CONTENTS = """
# This drop-in unit file was created by the Azure VM Agent.
# Do not edit.
[Service]
Slice=azure.slice
"""
_DROP_IN_FILE_CPU_ACCOUNTING = "11-CPUAccounting.conf"
_DROP_IN_FILE_CPU_ACCOUNTING_CONTENTS = """
# This drop-in unit file was created by the Azure VM Agent.
# Do not edit.
[Service]
CPUAccounting=yes
"""
_DROP_IN_FILE_CPU_QUOTA = "12-CPUQuota.conf"
_DROP_IN_FILE_CPU_QUOTA_CONTENTS_FORMAT = """
# This drop-in unit file was created by the Azure VM Agent.
# Do not edit.
[Service]
CPUQuota={0}
"""
_DROP_IN_FILE_MEMORY_ACCOUNTING = "13-MemoryAccounting.conf"
_DROP_IN_FILE_MEMORY_ACCOUNTING_CONTENTS = """
# This drop-in unit file was created by the Azure VM Agent.
# Do not edit.
[Service]
MemoryAccounting=yes
"""
class DisableCgroups(object):
ALL = "all"
AGENT = "agent"
EXTENSIONS = "extensions"
class CGroupConfigurator(object):
"""
This class implements the high-level operations on CGroups (e.g. initialization, creation, etc)
NOTE: with the exception of start_extension_command, none of the methods in this class
raise exceptions (cgroup operations should not block extensions)
"""
class _Impl(object):
def __init__(self):
self._initialized = False
self._cgroups_supported = False
self._agent_cgroups_enabled = False
self._extensions_cgroups_enabled = False
self._cgroups_api = None
self._agent_cgroup = None
self._agent_memory_metrics = None
self._check_cgroups_lock = threading.RLock() # Protect the check_cgroups which is called from Monitor thread and main loop.
self._unexpected_processes = {}
def initialize(self):
try:
if self._initialized:
return
# check whether cgroup monitoring is supported on the current distro
self._cgroups_supported = self._check_cgroups_supported()
if not self._cgroups_supported:
# If a distro is not supported, attempt to clean up any existing drop in files in case it was
# previously supported. It is necessary to cleanup in this scenario in case the OS hits any bugs on
# the kernel related to cgroups.
if not self.using_cgroup_v2():
log_cgroup_info("Agent will reset the quotas in case cgroup usage went from enabled to disabled")
self._reset_agent_cgroup_setup()
return
# We check the agent unit 'Slice' property before setting up azure.slice. This check is done first
# because the agent's Slice unit property will be 'azure.slice' if the slice drop-in file exists, even
# though systemd has not moved the agent to azure.slice yet. Systemd will only move the agent to
# azure.slice after a vm restart.
agent_unit_name = systemd.get_agent_unit_name()
agent_slice = systemd.get_unit_property(agent_unit_name, "Slice")
if agent_slice not in (AZURE_SLICE, "system.slice"):
log_cgroup_warning("The agent is within an unexpected slice: {0}".format(agent_slice))
return
# Before agent setup, cleanup the old agent setup (drop-in files) since new agent uses different approach(systemctl) to setup cgroups.
self._cleanup_old_agent_setup()
# Notes about slice setup:
# For machines where daemon version did not already create azure.slice, the
# agent creates azure.slice and the agent unit Slice drop-in file(without daemon-reload), but systemd does not move the agent
# unit to azure.slice until vm restart. It is ok to enable cgroup usage in this case if agent is
# running in system.slice.
self._setup_azure_slice()
# Log mount points/root paths for cgroup controllers
self._cgroups_api.log_root_paths()
# Get agent cgroup
self._agent_cgroup = self._cgroups_api.get_unit_cgroup(unit_name=agent_unit_name, cgroup_name=AGENT_NAME_TELEMETRY)
if conf.get_cgroup_disable_on_process_check_failure() and self._check_fails_if_processes_found_in_agent_cgroup_before_enable(agent_slice):
reason = "Found unexpected processes in the agent cgroup before agent enable cgroups."
self.disable(reason, DisableCgroups.ALL)
return
# Get controllers to track
agent_controllers = self._agent_cgroup.get_controllers(expected_relative_path=os.path.join(agent_slice, agent_unit_name))
if len(agent_controllers) > 0:
self.enable()
self._enable_accounting(agent_unit_name)
for controller in agent_controllers:
for prop in controller.get_unit_properties():
log_cgroup_info('Agent {0} unit property value: {1}'.format(prop, systemd.get_unit_property(systemd.get_agent_unit_name(), prop)))
if isinstance(controller, _CpuController):
self._set_cpu_quota(agent_unit_name, conf.get_agent_cpu_quota())
elif isinstance(controller, _MemoryController):
self._agent_memory_metrics = controller
CGroupsTelemetry.track_cgroup_controller(controller)
except Exception as exception:
log_cgroup_warning("Error initializing cgroups: {0}".format(ustr(exception)))
finally:
log_cgroup_info('Agent cgroups enabled: {0}'.format(self._agent_cgroups_enabled))
self._initialized = True
def _check_cgroups_supported(self):
distro_supported = CGroupUtil.distro_supported()
if not distro_supported:
log_cgroup_info("Cgroups is not currently supported on {0}".format(get_distro()), send_event=True)
return False
if not systemd.is_systemd():
log_cgroup_warning("systemd was not detected on {0}".format(get_distro()), send_event=True)
log_cgroup_info("Cgroups won't be supported on non-systemd systems", send_event=True)
return False
if not self._check_no_legacy_cgroups():
log_cgroup_warning("The daemon's PID was added to a legacy cgroup; will not enable cgroups.", send_event=True)
return False
try:
self._cgroups_api = create_cgroup_api()
log_cgroup_info("Using cgroup {0} for resource enforcement and monitoring".format(self._cgroups_api.get_cgroup_version()))
except InvalidCgroupMountpointException as e:
# Systemd mounts the cgroup file system at '/sys/fs/cgroup'. Previously, the agent supported cgroup
# usage if a user mounted the cgroup filesystem elsewhere. The agent no longer supports that
# scenario. Cleanup any existing drop in files in case the agent previously supported cgroups on
# this machine.
log_cgroup_warning(
"The agent does not support cgroups if the default systemd mountpoint is not being used: {0}".format(
ustr(e)), send_event=True)
return False
except CGroupsException as e:
log_cgroup_warning("Unable to determine which cgroup version to use: {0}".format(ustr(e)),
send_event=True)
return False
if self.using_cgroup_v2():
log_cgroup_info("Agent and extensions resource enforcement and monitoring is not currently supported on cgroup v2", send_event=True)
return False
return True
@staticmethod
def _check_no_legacy_cgroups():
"""
Older versions of the daemon (2.2.31-2.2.40) wrote their PID to /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent. When running
under systemd this could produce invalid resource usage data. Cgroups should not be enabled under this condition.
"""
legacy_cgroups = CGroupUtil.cleanup_legacy_cgroups()
if legacy_cgroups > 0:
return False
return True
@staticmethod
def _cleanup_old_agent_setup():
"""
New agent switching to use systemctl cmd instead of drop-files for desired configuration. So, cleaning up the old drop-in files.
We will keep cleanup code for few agents, until we determine all vms moved to new agent version.
"""
# Older agents used to create this slice, but it was never used. Cleanup the file.
CGroupConfigurator._Impl._cleanup_unit_file("/etc/systemd/system/system-walinuxagent.extensions.slice")
unit_file_install_path = systemd.get_unit_file_install_path()
logcollector_slice = os.path.join(unit_file_install_path, LOGCOLLECTOR_SLICE)
agent_drop_in_path = systemd.get_agent_drop_in_path()
agent_drop_in_file_cpu_accounting = os.path.join(agent_drop_in_path, _DROP_IN_FILE_CPU_ACCOUNTING)
agent_drop_in_file_memory_accounting = os.path.join(agent_drop_in_path, _DROP_IN_FILE_MEMORY_ACCOUNTING)
agent_drop_in_file_cpu_quota = os.path.join(agent_drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
# New agent will setup limits for scope instead slice, so removing existing logcollector slice.
CGroupConfigurator._Impl._cleanup_unit_file(logcollector_slice)
# Cleanup the old drop-in files, new agent will use systemdctl set-property to enable accounting and limits
CGroupConfigurator._Impl._cleanup_unit_file(agent_drop_in_file_cpu_accounting)
CGroupConfigurator._Impl._cleanup_unit_file(agent_drop_in_file_memory_accounting)
CGroupConfigurator._Impl._cleanup_unit_file(agent_drop_in_file_cpu_quota)
@staticmethod
def _setup_azure_slice():
"""
The agent creates "azure.slice" for use by extensions and the agent. The agent runs under "azure.slice" directly and each
extension runs under its own slice ("Microsoft.CPlat.Extension.slice" in the example below). All the slices for
extensions are grouped under "vmextensions.slice".
Example: -.slice
├─user.slice
├─system.slice
└─azure.slice
├─walinuxagent.service
│ ├─5759 /usr/bin/python3 -u /usr/sbin/waagent -daemon
│ └─5764 python3 -u bin/WALinuxAgent-2.2.53-py2.7.egg -run-exthandlers
└─azure-vmextensions.slice
└─Microsoft.CPlat.Extension.slice
└─5894 /usr/bin/python3 /var/lib/waagent/Microsoft.CPlat.Extension-1.0.0.0/enable.py
This method ensures that the "azure" and "vmextensions" slices are created. Setup should create those slices
under /lib/systemd/system; but if they do not exist, __ensure_azure_slices_exist will create them.
"""
unit_file_install_path = systemd.get_unit_file_install_path()
azure_slice = os.path.join(unit_file_install_path, AZURE_SLICE)
vmextensions_slice = os.path.join(unit_file_install_path, _VMEXTENSIONS_SLICE)
agent_unit_file = systemd.get_agent_unit_file()
agent_drop_in_path = systemd.get_agent_drop_in_path()
agent_drop_in_file_slice = os.path.join(agent_drop_in_path, _AGENT_DROP_IN_FILE_SLICE)
files_to_create = []
if not os.path.exists(azure_slice):
files_to_create.append((azure_slice, _AZURE_SLICE_CONTENTS))
if not os.path.exists(vmextensions_slice):
files_to_create.append((vmextensions_slice, _VMEXTENSIONS_SLICE_CONTENTS))
if fileutil.findre_in_file(agent_unit_file, r"Slice=") is not None:
CGroupConfigurator._Impl._cleanup_unit_file(agent_drop_in_file_slice)
else:
if not os.path.exists(agent_drop_in_file_slice):
files_to_create.append((agent_drop_in_file_slice, _AGENT_DROP_IN_FILE_SLICE_CONTENTS))
if len(files_to_create) > 0:
# create the unit files, but if 1 fails remove all and return
try:
for path, contents in files_to_create:
CGroupConfigurator._Impl._create_unit_file(path, contents)
except Exception as exception:
log_cgroup_warning("Failed to create unit files for the azure slice: {0}".format(ustr(exception)))
for unit_file in files_to_create:
CGroupConfigurator._Impl._cleanup_unit_file(unit_file)
return
def _reset_agent_cgroup_setup(self):
try:
agent_drop_in_path = systemd.get_agent_drop_in_path()
if os.path.exists(agent_drop_in_path) and os.path.isdir(agent_drop_in_path):
files_to_cleanup = []
agent_drop_in_file_slice = os.path.join(agent_drop_in_path, _AGENT_DROP_IN_FILE_SLICE)
files_to_cleanup.append(agent_drop_in_file_slice)
agent_drop_in_file_cpu_accounting = os.path.join(agent_drop_in_path,
_DROP_IN_FILE_CPU_ACCOUNTING)
files_to_cleanup.append(agent_drop_in_file_cpu_accounting)
agent_drop_in_file_memory_accounting = os.path.join(agent_drop_in_path,
_DROP_IN_FILE_MEMORY_ACCOUNTING)
files_to_cleanup.append(agent_drop_in_file_memory_accounting)
agent_drop_in_file_cpu_quota = os.path.join(agent_drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
files_to_cleanup.append(agent_drop_in_file_cpu_quota)
if len(files_to_cleanup) > 0:
log_cgroup_info("Found drop-in files; attempting agent cgroup setup cleanup", send_event=False)
self._cleanup_all_files(files_to_cleanup)
self._reset_cpu_quota(systemd.get_agent_unit_name())
except Exception as err:
logger.warn("Error while resetting the quotas: {0}".format(err))
@staticmethod
def _enable_accounting(unit_name):
"""
Enable CPU and Memory accounting for the unit
"""
try:
# since we don't use daemon-reload and drop-files for accounting, so it will be enabled with systemctl set-property
accounting_properties = ("CPUAccounting", "MemoryAccounting")
values = ("yes", "yes")
log_cgroup_info("Enabling accounting properties for the agent: {0}".format(accounting_properties))
systemd.set_unit_run_time_properties(unit_name, accounting_properties, values)
except Exception as exception:
log_cgroup_warning("Failed to set accounting properties for the agent: {0}".format(ustr(exception)))
# W0238: Unused private member `_Impl.__create_unit_file(path, contents)` (unused-private-member)
@staticmethod
def _create_unit_file(path, contents): # pylint: disable=unused-private-member
parent, _ = os.path.split(path)
if not os.path.exists(parent):
fileutil.mkdir(parent, mode=0o755)
exists = os.path.exists(path)
fileutil.write_file(path, contents)
log_cgroup_info("{0} {1}".format("Updated" if exists else "Created", path))
# W0238: Unused private member `_Impl.__cleanup_unit_file(path)` (unused-private-member)
@staticmethod
def _cleanup_unit_file(path): # pylint: disable=unused-private-member
if os.path.exists(path):
try:
os.remove(path)
log_cgroup_info("Removed {0}".format(path))
except Exception as exception:
log_cgroup_warning("Failed to remove {0}: {1}".format(path, ustr(exception)))
@staticmethod
def _cleanup_all_files(files_to_cleanup):
for path in files_to_cleanup:
if os.path.exists(path):
try:
os.remove(path)
log_cgroup_info("Removed {0}".format(path))
except Exception as exception:
log_cgroup_warning("Failed to remove {0}: {1}".format(path, ustr(exception)))
@staticmethod
def _create_all_files(files_to_create):
# create the unit files, but if 1 fails remove all and return
try:
for path, contents in files_to_create:
CGroupConfigurator._Impl._create_unit_file(path, contents)
except Exception as exception:
log_cgroup_warning("Failed to create unit files : {0}".format(ustr(exception)))
for unit_file in files_to_create:
CGroupConfigurator._Impl._cleanup_unit_file(unit_file)
return
@staticmethod
def _get_current_cpu_quota(unit_name):
"""
Calculate the CPU percentage from CPUQuotaPerSecUSec for given unit.
Params:
cpu_quota_per_sec_usec (str): The value of CPUQuotaPerSecUSec (e.g., "1s", "500ms", "500us", or "infinity").
Returns:
str: CPU percentage, or 'infinity' or 'unknown' if we can't determine the value.
"""
try:
cpu_quota_per_sec_usec = systemd.get_unit_property(unit_name, "CPUQuotaPerSecUSec").strip().lower()
if cpu_quota_per_sec_usec == "infinity":
return cpu_quota_per_sec_usec # No limit on CPU usage
# Parse the value based on the suffix
elif cpu_quota_per_sec_usec.endswith("us"):
# Directly use the microseconds value
cpu_quota_us = float(cpu_quota_per_sec_usec[:-2])
elif cpu_quota_per_sec_usec.endswith("ms"):
# Convert milliseconds to microseconds
cpu_quota_us = float(cpu_quota_per_sec_usec[:-2]) * 1000
elif cpu_quota_per_sec_usec.endswith("s"):
# Convert seconds to microseconds
cpu_quota_us = float(cpu_quota_per_sec_usec[:-1]) * 1000000
else:
raise ValueError("Invalid format. Expected 's', 'ms', 'us', or 'infinity'.")
# Calculate CPU percentage
cpu_percentage = (cpu_quota_us / 1000000) * 100
return "{0:g}%".format(cpu_percentage) # :g Removes trailing zeros after decimal point
except Exception as e:
log_cgroup_warning("Error parsing current CPUQuotaPerSecUSec: {0}".format(ustr(e)))
return "unknown"
def supported(self):
return self._cgroups_supported
def enabled(self):
return self._agent_cgroups_enabled or self._extensions_cgroups_enabled
def agent_enabled(self):
return self._agent_cgroups_enabled
def extensions_enabled(self):
return self._extensions_cgroups_enabled
def using_cgroup_v2(self):
return isinstance(self._cgroups_api, SystemdCgroupApiv2)
def enable(self):
if not self.supported():
raise CGroupsException(
"Attempted to enable cgroups, but they are not supported on the current platform")
self._agent_cgroups_enabled = True
self._extensions_cgroups_enabled = True
def disable(self, reason, disable_cgroups):
"""
TODO: This method needs a refactor. We should not disable the cgroups if we fail to reset the agent's cgroup quota.
Today we disable the cgroups even if we fail to reset te agent cgroup quota, as a result, extensions may run with agent limits, which is not good.
Other side if we don't disable the cgroups, we end up calling the reset until systemd is recovered from error. If systemd error is connection timed out, it's just adding
significant delay to the extension execution.
"""
try:
if disable_cgroups == DisableCgroups.ALL: # disable all
# Reset quotas
self._reset_cpu_quota(systemd.get_agent_unit_name())
extension_services = self.get_extension_services_list()
for extension in extension_services:
log_cgroup_info("Resetting extension : {0} and it's services: {1} Quota".format(extension, extension_services[extension]), send_event=False)
self.reset_extension_quota(extension_name=extension)
self.reset_extension_services_quota(extension_services[extension])
CGroupsTelemetry.reset()
self._agent_cgroups_enabled = False
self._extensions_cgroups_enabled = False
elif disable_cgroups == DisableCgroups.AGENT: # disable agent
self._reset_cpu_quota(systemd.get_agent_unit_name())
agent_controllers = self._agent_cgroup.get_controllers()
for controller in agent_controllers:
if isinstance(controller, _CpuController):
CGroupsTelemetry.stop_tracking(controller)
break
self._agent_cgroups_enabled = False
log_cgroup_warning("Disabling resource usage monitoring. Reason: {0}".format(reason), op=WALAEventOperation.CGroupsDisabled)
except Exception as exception:
log_cgroup_warning("Error disabling cgroups: {0}".format(ustr(exception)))
@staticmethod
def _set_cpu_quota(unit_name, quota):
"""
Sets CPU quota to the given percentage (100% == 1 CPU)
NOTE: This is done using a systemtcl set-property --runtime; any local overrides in /etc folder on the VM will take precedence
over this setting.
"""
quota_percentage = "{0}%".format(quota)
log_cgroup_info("Setting {0}'s CPUQuota to {1}".format(unit_name, quota_percentage))
CGroupConfigurator._Impl._try_set_cpu_quota(unit_name, quota_percentage)
@staticmethod
def _reset_cpu_quota(unit_name):
"""
Removes any CPUQuota on the agent
NOTE: This resets the quota on the agent's default dropin file; any local overrides on the VM will take precedence
over this setting.
"""
log_cgroup_info("Resetting {0}'s CPUQuota".format(unit_name), send_event=False)
if CGroupConfigurator._Impl._try_set_cpu_quota(unit_name, "infinity"): # systemd expresses no-quota as infinity, following the same convention
try:
log_cgroup_info('Current CPUQuota: {0}'.format(systemd.get_unit_property(unit_name, "CPUQuotaPerSecUSec")))
except Exception as e:
log_cgroup_warning('Failed to get current CPUQuotaPerSecUSec after reset: {0}'.format(ustr(e)))
# W0238: Unused private member `_Impl.__try_set_cpu_quota(quota)` (unused-private-member)
@staticmethod
def _try_set_cpu_quota(unit_name, quota): # pylint: disable=unused-private-member
try:
current_cpu_quota = CGroupConfigurator._Impl._get_current_cpu_quota(unit_name)
if current_cpu_quota == quota:
return True
quota = quota if quota != "infinity" else "" # no-quota expressed as empty string while setting property
systemd.set_unit_run_time_property(unit_name, "CPUQuota", quota)
except Exception as exception:
log_cgroup_warning('Failed to set CPUQuota: {0}'.format(ustr(exception)))
return False
return True
def _check_fails_if_processes_found_in_agent_cgroup_before_enable(self, agent_slice):
"""
This check ensures that before we enable the agent's cgroups, there are no unexpected processes in the agent's cgroup already.
The issue we observed that long running extension processes may be in agent cgroups if agent goes this cycle enabled(1)->disabled(2)->enabled(3).
1. Agent cgroups enabled in some version
2. Disabled agent cgroups due to check_cgroups regular check. Once we disable the cgroups we don't run the extensions in it's own slice, so they will be in agent cgroups.
3. When ext_hanlder restart and enable the cgroups again, already running processes from step 2 still be in agent cgroups. This may cause the extensions run with agent limit.
"""
if agent_slice not in (AZURE_SLICE, "system.slice"):
return False
try:
log_cgroup_info("Checking for unexpected processes in the agent's cgroup before enabling cgroups")
self._check_processes_in_agent_cgroup(True)
except CGroupsException as exception:
log_cgroup_warning(ustr(exception))
return True
return False
def check_cgroups(self, cgroup_metrics):
self._check_cgroups_lock.acquire()
try:
if not self.enabled():
return
errors = []
process_check_success = False
try:
self._check_processes_in_agent_cgroup(False)
process_check_success = True
except CGroupsException as exception:
errors.append(exception)
quota_check_success = False
try:
if cgroup_metrics:
self._check_agent_throttled_time(cgroup_metrics)
quota_check_success = True
except CGroupsException as exception:
errors.append(exception)
reason = "Check on cgroups failed:\n{0}".format("\n".join([ustr(e) for e in errors]))
if not process_check_success and conf.get_cgroup_disable_on_process_check_failure():
self.disable(reason, DisableCgroups.ALL)
if not quota_check_success and conf.get_cgroup_disable_on_quota_check_failure():
self.disable(reason, DisableCgroups.AGENT)
finally:
self._check_cgroups_lock.release()
def _check_processes_in_agent_cgroup(self, report_immediately):
"""
Verifies that the agent's cgroup includes only the current process, its parent, commands started using shellutil and instances of systemd-run
(those processes correspond, respectively, to the extension handler, the daemon, commands started by the extension handler, and the systemd-run
commands used to start extensions on their own cgroup).
Other processes started by the agent (e.g. extensions) and processes not started by the agent (e.g. services installed by extensions) are reported
as unexpected, since they should belong to their own cgroup.
Raises a CGroupsException only when current unexpected process seen last time.
report_immediately - flag to switch to old behavior and report immediately if any unexpected process found.
Note: Process check was added as conservative approach before cgroups feature stable. Now it's producing noise due to race issues, some of those issues are extra process before systemd move to new cgroup or process about to die.
So now changing the behavior to raise an issue only when we see the same unexpected process on last check. Later we will remove the check if no issues reported.
"""
current_unexpected = {}
agent_cgroup_proc_names = []
report = []
try:
daemon = os.getppid()
extension_handler = os.getpid()
agent_commands = set()
agent_commands.update(shellutil.get_running_commands())
systemd_run_commands = set()
systemd_run_commands.update(self._cgroups_api.get_systemd_run_commands())
agent_cgroup_proccesses = self._agent_cgroup.get_processes()
# get the running commands again in case new commands started or completed while we were fetching the processes in the cgroup;
agent_commands.update(shellutil.get_running_commands())
systemd_run_commands.update(self._cgroups_api.get_systemd_run_commands())
for process in agent_cgroup_proccesses:
agent_cgroup_proc_names.append(self._format_process(process))
# Note that the agent uses systemd-run to start extensions; systemd-run belongs to the agent cgroup, though the extensions don't.
if process in (daemon, extension_handler) or process in systemd_run_commands:
continue
# check shell systemd_run process if above process check didn't catch it
if self._check_systemd_run_process(process):
continue
# systemd_run_commands contains the shell that started systemd-run, so we also need to check for the parent
if self._get_parent(process) in systemd_run_commands and self._get_command(
process) == 'systemd-run':
continue
# check if the process is a command started by the agent or a descendant of one of those commands
current = process
while current != 0 and current not in agent_commands:
current = self._get_parent(current)
# Verify if Process started by agent based on the marker found in process environment or process is in Zombie state.
# If so, consider it as valid process in agent cgroup.
if current == 0 and not (self._is_process_descendant_of_the_agent(process) or self._is_zombie_process(process)):
current_unexpected[process] = self._format_process(process)
if report_immediately:
report = current_unexpected.values()
else:
for process in current_unexpected:
if process in self._unexpected_processes:
report.append(current_unexpected[process])
if len(report) >= 5: # collect just a small sample
break
self._unexpected_processes = current_unexpected
except Exception as exception:
log_cgroup_warning("Error checking the processes in the agent's cgroup: {0}".format(ustr(exception)))
if len(report) > 0:
self._report_agent_cgroups_procs(agent_cgroup_proc_names, report)
raise CGroupsException("The agent's cgroup includes unexpected processes: {0}".format(report))
def get_logcollector_unit_properties(self):
"""
Returns the systemd unit properties for the log collector process.
Each property should be explicitly set (even if already included in the log collector slice) for the log
collector process to run in the transient scope directory with the expected accounting and limits.
"""
logcollector_properties = ["--property=CPUAccounting=yes", "--property=MemoryAccounting=yes", "--property=CPUQuota={0}".format(LOGCOLLECTOR_CPU_QUOTA_FOR_V1_AND_V2)]
if not self.using_cgroup_v2():
return logcollector_properties
# Memory throttling limit is used when running log collector on v2 machines using the 'MemoryHigh' property.
# We do not use a systemd property to enforce memory on V1 because it invokes the OOM killer if the limit
# is exceeded.
logcollector_properties.append("--property=MemoryHigh={0}".format(LOGCOLLECTOR_MEMORY_THROTTLE_LIMIT_FOR_V2))
return logcollector_properties
@staticmethod
def _get_command(pid):
try:
with open('/proc/{0}/comm'.format(pid), "r") as file_:
comm = file_.read()
if comm and comm[-1] == '\x00': # if null-terminated, remove the null
comm = comm[:-1]
return comm.rstrip()
except Exception:
return "UNKNOWN"
@staticmethod
def _format_process(pid):
"""
Formats the given PID as a string containing the PID and the corresponding command line truncated to 64 chars
"""
try:
cmdline = '/proc/{0}/cmdline'.format(pid)
if os.path.exists(cmdline):
with open(cmdline, "r") as cmdline_file:
return "[PID: {0}] {1:64.64}".format(pid, cmdline_file.read())
except Exception:
pass
return "[PID: {0}] UNKNOWN".format(pid)
@staticmethod
def _is_process_descendant_of_the_agent(pid):
"""
Returns True if the process is descendant of the agent by looking at the env flag(AZURE_GUEST_AGENT_PARENT_PROCESS_NAME)
that we set when the process starts otherwise False.
"""
try:
env = '/proc/{0}/environ'.format(pid)
if os.path.exists(env):
with open(env, "r") as env_file:
environ = env_file.read()
if environ and environ[-1] == '\x00':
environ = environ[:-1]
return "{0}={1}".format(shellutil.PARENT_PROCESS_NAME, shellutil.AZURE_GUEST_AGENT) in environ
except Exception:
pass
return False
@staticmethod
def _is_zombie_process(pid):
"""
Returns True if process is in Zombie state otherwise False.
Ex: cat /proc/18171/stat
18171 (python3) S 18103 18103 18103 0 -1 4194624 57736 64902 0 3
"""
try:
stat = '/proc/{0}/stat'.format(pid)
if os.path.exists(stat):
with open(stat, "r") as stat_file:
return stat_file.read().split()[2] == 'Z'
except Exception:
pass
return False
@staticmethod
def _check_systemd_run_process(process):
"""
Returns True if process is shell systemd-run process started by agent otherwise False.
Ex: sh,7345 -c systemd-run --unit=enable_7c5cab19-eb79-4661-95d9-9e5091bd5ae0 --scope --slice=azure-vmextensions-Microsoft.OSTCExtensions.VMAccessForLinux_1.5.11.slice /var/lib/waagent/Microsoft.OSTCExtensions.VMAccessForLinux-1.5.11/processes.sh
"""
try:
process_name = "UNKNOWN"
cmdline = '/proc/{0}/cmdline'.format(process)
if os.path.exists(cmdline):
with open(cmdline, "r") as cmdline_file:
process_name = "{0}".format(cmdline_file.read())
match = re.search(r'systemd-run.*--unit=.*--scope.*--slice=azure-vmextensions.*', process_name)
if match is not None:
return True
except Exception:
pass
return False
@staticmethod
def _report_agent_cgroups_procs(agent_cgroup_proc_names, unexpected):
for proc_name in unexpected:
if 'UNKNOWN' in proc_name:
msg = "Agent includes following processes when UNKNOWN process found: {0}".format("\n".join([ustr(proc) for proc in agent_cgroup_proc_names]))
add_event(op=WALAEventOperation.CGroupsInfo, message=msg)
@staticmethod
def _check_agent_throttled_time(cgroup_metrics):
for metric in cgroup_metrics:
if metric.instance == AGENT_NAME_TELEMETRY and metric.counter == MetricsCounter.THROTTLED_TIME:
if metric.value > conf.get_agent_cpu_throttled_time_threshold():
raise CGroupsException("The agent has been throttled for {0} seconds".format(metric.value))
def check_agent_memory_usage(self):
if self.enabled() and self._agent_memory_metrics is not None:
metrics = self._agent_memory_metrics.get_tracked_metrics()
current_usage = 0
for metric in metrics:
if metric.counter == MetricsCounter.TOTAL_MEM_USAGE:
current_usage += metric.value
elif metric.counter == MetricsCounter.SWAP_MEM_USAGE:
current_usage += metric.value
if current_usage > conf.get_agent_memory_quota():
raise AgentMemoryExceededException("The agent memory limit {0} bytes exceeded. The current reported usage is {1} bytes.".format(conf.get_agent_memory_quota(), current_usage))
@staticmethod
def _get_parent(pid):
"""
Returns the parent of the given process. If the parent cannot be determined returns 0 (which is the PID for the scheduler)
"""
try:
stat = '/proc/{0}/stat'.format(pid)
if os.path.exists(stat):
with open(stat, "r") as stat_file:
return int(stat_file.read().split()[3])
except Exception:
pass
return 0
def start_tracking_unit_cgroups(self, unit_name):
if self.enabled():
try:
cgroup = self._cgroups_api.get_unit_cgroup(unit_name, unit_name)
controllers = cgroup.get_controllers()
for controller in controllers:
CGroupsTelemetry.track_cgroup_controller(controller)
except Exception as exception:
log_cgroup_info("Failed to start tracking resource usage for the extension: {0}".format(ustr(exception)), send_event=False)
def stop_tracking_unit_cgroups(self, unit_name):
if self.enabled():
try:
cgroup = self._cgroups_api.get_unit_cgroup(unit_name, unit_name)
controllers = cgroup.get_controllers()
for controller in controllers:
CGroupsTelemetry.stop_tracking(controller)
except Exception as exception:
log_cgroup_info("Failed to stop tracking resource usage for the extension service: {0}".format(ustr(exception)), send_event=False)
def stop_tracking_extension_cgroups(self, extension_name):
if self.enabled():
try:
extension_slice_name = CGroupUtil.get_extension_slice_name(extension_name)
cgroup_relative_path = os.path.join(_AZURE_VMEXTENSIONS_SLICE, extension_slice_name)
cgroup = self._cgroups_api.get_cgroup_from_relative_path(relative_path=cgroup_relative_path,
cgroup_name=extension_name)
controllers = cgroup.get_controllers()
for controller in controllers:
CGroupsTelemetry.stop_tracking(controller)
except Exception as exception:
log_cgroup_info("Failed to stop tracking resource usage for the extension service: {0}".format(ustr(exception)), send_event=False)
def start_extension_command(self, extension_name, command, cmd_name, timeout, shell, cwd, env, stdout, stderr,
error_code=ExtensionErrorCodes.PluginUnknownFailure):
"""
Starts a command (install/enable/etc) for an extension and adds the command's PID to the extension's cgroup
:param extension_name: The extension executing the command
:param command: The command to invoke
:param cmd_name: The type of the command(enable, install, etc.)
:param timeout: Number of seconds to wait for command completion
:param cwd: The working directory for the command
:param env: The environment to pass to the command's process
:param stdout: File object to redirect stdout to
:param stderr: File object to redirect stderr to
:param stderr: File object to redirect stderr to
:param error_code: Extension error code to raise in case of error
"""
if self.enabled():
try:
return self._cgroups_api.start_extension_command(extension_name, command, cmd_name, timeout,
shell=shell, cwd=cwd, env=env, stdout=stdout,
stderr=stderr, error_code=error_code)
except SystemdRunError as exception:
reason = 'Failed to start {0} using systemd-run, will try invoking the extension directly. Error: {1}'.format(
extension_name, ustr(exception))
self.disable(reason, DisableCgroups.ALL)
# fall-through and re-invoke the extension
# subprocess-popen-preexec-fn<W1509> Disabled: code is not multi-threaded
process = subprocess.Popen(command, shell=shell, cwd=cwd, env=env, stdout=stdout, stderr=stderr, preexec_fn=os.setsid) # pylint: disable=W1509
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout, stderr=stderr, error_code=error_code)
@staticmethod
def _get_unit_properties_requiring_update(unit_name, cpu_quota=""):
"""
Check if the cgroups setup is completed for the unit and return the properties that need an update.
"""
properties_to_update = ()
properties_values = ()
cpu_accounting = systemd.get_unit_property(unit_name, "CPUAccounting")
if cpu_accounting != "yes":
properties_to_update += ("CPUAccounting",)
properties_values += ("yes",)
memory_accounting = systemd.get_unit_property(unit_name, "MemoryAccounting")
if memory_accounting != "yes":
properties_to_update += ("MemoryAccounting",)
properties_values += ("yes",)
current_cpu_quota = CGroupConfigurator._Impl._get_current_cpu_quota(unit_name)
if current_cpu_quota != cpu_quota:
properties_to_update += ("CPUQuota",)
# no-quota expressed as empty string while setting property
cpu_quota = cpu_quota if cpu_quota != "infinity" else ""
properties_values += (cpu_quota,)
return properties_to_update, properties_values
def setup_extension_slice(self, extension_name, cpu_quota):
"""
Each extension runs under its own slice (Ex "Microsoft.CPlat.Extension.slice"). All the slices for
extensions are grouped under "azure-vmextensions.slice.
This method ensures that the desired configuration created for the extension slice using systemdctl set-property.
TODO: set memory quotas
"""
if self.enabled():
extension_slice = CGroupUtil.get_extension_slice_name(extension_name)
try:
# clean up the old slice from the disk, new agent use systemdctl set-property
unit_file_install_path = systemd.get_unit_file_install_path()
extension_slice_path = os.path.join(unit_file_install_path, extension_slice)
CGroupConfigurator._Impl._cleanup_unit_file(extension_slice_path)
# clean up the old-old slice(includes version in the name) from the disk
old_extension_slice_path = os.path.join(unit_file_install_path,
CGroupUtil.get_extension_slice_name(extension_name,
old_slice=True))
if os.path.exists(old_extension_slice_path):
CGroupConfigurator._Impl._cleanup_unit_file(old_extension_slice_path)
cpu_quota = "{0}%".format(
cpu_quota) if cpu_quota is not None else "infinity" # following systemd convention for no-quota (infinity)
properties_to_update, properties_values = self._get_unit_properties_requiring_update(extension_slice, cpu_quota)
if len(properties_to_update) > 0:
if cpu_quota == "infinity":
log_cgroup_info("CPUQuota not set for {0}".format(extension_name))
else:
log_cgroup_info("Setting {0}'s CPUQuota to {1}".format(extension_name, cpu_quota))
log_cgroup_info("Setting up the resource properties: {0} for {1}".format(properties_to_update, extension_slice))
systemd.set_unit_run_time_properties(extension_slice, properties_to_update, properties_values)
except Exception as exception:
log_cgroup_warning("Failed to set the extension {0} slice and quotas: {1}".format(extension_slice,
ustr(exception)))
def reset_extension_quota(self, extension_name):
"""
Removes any CPUQuota on the extension
NOTE: This resets the quota on the extension's slice; any local overrides on the VM will take precedence
over this setting.
TODO: reset memory quotas
"""
if self.enabled():
try:
self._reset_cpu_quota(CGroupUtil.get_extension_slice_name(extension_name))
except Exception as exception:
log_cgroup_warning('Failed to reset for {0}: {1}'.format(extension_name, ustr(exception)))
def set_extension_services_cpu_memory_quota(self, services_list):
"""
Each extension service will have name, systemd path and it's quotas.
This method ensure limits set with systemtctl at runtime
TODO: set memory quotas
"""
if self.enabled() and services_list is not None:
for service in services_list:
service_name = service.get('name', None)
unit_file_path = systemd.get_unit_file_install_path()
if service_name is not None and unit_file_path is not None:
# remove drop files from disk, new agent use systemdctl set-property
files_to_remove = []
drop_in_path = os.path.join(unit_file_path, "{0}.d".format(service_name))
drop_in_file_cpu_accounting = os.path.join(drop_in_path,
_DROP_IN_FILE_CPU_ACCOUNTING)
files_to_remove.append(drop_in_file_cpu_accounting)
drop_in_file_memory_accounting = os.path.join(drop_in_path,
_DROP_IN_FILE_MEMORY_ACCOUNTING)
files_to_remove.append(drop_in_file_memory_accounting)
drop_in_file_cpu_quota = os.path.join(drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
files_to_remove.append(drop_in_file_cpu_quota)
self._cleanup_all_files(files_to_remove)
cpu_quota = service.get('cpuQuotaPercentage')
cpu_quota = "{0}%".format(cpu_quota) if cpu_quota is not None else "infinity" # following systemd convention for no-quota (infinity)
try:
properties_to_update, properties_values = self._get_unit_properties_requiring_update(service_name, cpu_quota)
except Exception as exception:
log_cgroup_warning("Failed to get the properties to update for {0}: {1}".format(service_name, ustr(exception)))
# when we fail to get the properties to update, we will skip the set-property and continue for next service
continue
# If systemd is unaware of extension services and not loaded in the system yet, we get error while setting quotas. Hence, added unit loaded check.
if systemd.is_unit_loaded(service_name) and len(properties_to_update) > 0:
if cpu_quota != "infinity":
log_cgroup_info("Setting {0}'s CPUQuota to {1}".format(service_name, cpu_quota))
else:
log_cgroup_info("CPUQuota not set for {0}".format(service_name))
log_cgroup_info("Setting up resource properties: {0} for {1}" .format(properties_to_update, service_name))
try:
systemd.set_unit_run_time_properties(service_name, properties_to_update, properties_values)
except Exception as exception:
log_cgroup_warning("Failed to set the quotas for {0}: {1}".format(service_name, ustr(exception)))
def reset_extension_services_quota(self, services_list):
"""
Removes any CPUQuota on the extension service
NOTE: This resets the quota on the extension service's default; any local overrides on the VM will take precedence
over this setting.
TODO: reset memory quotas
"""
if self.enabled() and services_list is not None:
service_name = None
try:
for service in services_list:
service_name = service.get('name', None)
if service_name is not None and systemd.is_unit_loaded(service_name):
self._reset_cpu_quota(service_name)
except Exception as exception:
log_cgroup_warning('Failed to reset for {0} : {1}'.format(service_name, ustr(exception)))
def stop_tracking_extension_services_cgroups(self, services_list):
"""
Remove the cgroup entry from the tracked groups to stop tracking.
"""
if self.enabled() and services_list is not None:
for service in services_list:
service_name = service.get('name', None)
if service_name is not None:
self.stop_tracking_unit_cgroups(service_name)
def start_tracking_extension_services_cgroups(self, services_list):
"""
Add the cgroup entry to start tracking the services cgroups.
"""
if self.enabled() and services_list is not None:
for service in services_list:
service_name = service.get('name', None)
if service_name is not None:
self.start_tracking_unit_cgroups(service_name)
@staticmethod
def get_extension_services_list():
"""
ResourceLimits for extensions are coming from <extName>/HandlerManifest.json file.
Use this pattern to determine all the installed extension HandlerManifest files and
read the extension services if ResourceLimits are present.
"""
extensions_services = {}
for manifest_path in glob.iglob(os.path.join(conf.get_lib_dir(), "*/HandlerManifest.json")):
match = re.search("(?P<extname>[\\w+\\.-]+).HandlerManifest\\.json", manifest_path)
if match is not None:
extensions_name = match.group('extname')
if not extensions_name.startswith('WALinuxAgent'):
try:
data = json.loads(fileutil.read_file(manifest_path))
resource_limits = data[0].get('resourceLimits', None)
services = resource_limits.get('services') if resource_limits else None
extensions_services[extensions_name] = services
except (IOError, OSError) as e:
log_cgroup_warning(
'Failed to load manifest file ({0}): {1}'.format(manifest_path, e.strerror))
except ValueError:
log_cgroup_warning('Malformed manifest file ({0}).'.format(manifest_path))
return extensions_services
# unique instance for the singleton
_instance = None
@staticmethod
def get_instance():
if CGroupConfigurator._instance is None:
CGroupConfigurator._instance = CGroupConfigurator._Impl()
return CGroupConfigurator._instance