azurelinuxagent/common/osutil/systemd.py (51 lines of code) (raw):
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
#
import os
import re
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.utils import shellutil
def _get_os_util():
if _get_os_util.value is None:
_get_os_util.value = get_osutil()
return _get_os_util.value
_get_os_util.value = None
def is_systemd():
"""
Determine if systemd is managing system services; the implementation follows the same strategy as, for example,
sd_booted() in libsystemd, or /usr/sbin/service
"""
return os.path.exists("/run/systemd/system/")
def get_version():
# the output is similar to
# $ systemctl --version
# systemd 245 (245.4-4ubuntu3)
# +PAM +AUDIT +SELINUX +IMA +APPARMOR +SMACK +SYSVINIT +UTMP etc
#
# return fist line systemd 245 (245.4-4ubuntu3)
try:
output = shellutil.run_command(['systemctl', '--version'])
version = output.split('\n')[0]
return version
except Exception:
return "unknown"
def get_unit_file_install_path():
"""
e.g. /lib/systemd/system
"""
return _get_os_util().get_systemd_unit_file_install_path()
def get_agent_unit_name():
"""
e.g. walinuxagent.service
"""
return _get_os_util().get_service_name() + ".service"
def get_agent_unit_file():
"""
e.g. /lib/systemd/system/walinuxagent.service
"""
return os.path.join(get_unit_file_install_path(), get_agent_unit_name())
def get_agent_drop_in_path():
"""
e.g. /lib/systemd/system/walinuxagent.service.d
"""
return os.path.join(get_unit_file_install_path(), "{0}.d".format(get_agent_unit_name()))
def get_unit_property(unit_name, property_name):
output = shellutil.run_command(["systemctl", "show", unit_name, "--property", property_name])
# Output is similar to
# # systemctl show walinuxagent.service --property CPUQuotaPerSecUSec
# CPUQuotaPerSecUSec=50ms
match = re.match("[^=]+=(?P<value>.+)", output)
if match is None:
raise ValueError("Can't find property {0} of {1}".format(property_name, unit_name))
return match.group('value')
def set_unit_run_time_property(unit_name, property_name, value):
"""
Set a property of a unit at runtime
Note: --runtime settings only apply until the next reboot
"""
try:
# Ex: systemctl set-property foobar.service CPUWeight=200 --runtime
shellutil.run_command(["systemctl", "set-property", unit_name, "{0}={1}".format(property_name, value), "--runtime"])
except shellutil.CommandError as e:
raise ValueError("Can't set property {0} of {1}: {2}".format(property_name, unit_name, e))
def set_unit_run_time_properties(unit_name, property_names, values):
"""
Set multiple properties of a unit at runtime
Note: --runtime settings only apply until the next reboot
"""
if len(property_names) != len(values):
raise ValueError("The number of property names:{0} and values:{1} must be the same".format(property_names, values))
properties = ["{0}={1}".format(name, value) for name, value in zip(property_names, values)]
try:
# Ex: systemctl set-property foobar.service CPUWeight=200 MemoryMax=2G IPAccounting=yes --runtime
shellutil.run_command(["systemctl", "set-property", unit_name] + properties + ["--runtime"])
except shellutil.CommandError as e:
raise ValueError("Can't set properties {0} of {1}: {2}".format(properties, unit_name, e))
def is_unit_loaded(unit_name):
"""
Determine if a unit is loaded
"""
try:
value = get_unit_property(unit_name, "LoadState")
return value.lower() == "loaded"
except shellutil.CommandError:
return False