image_test/configuration/linux/generic_distro.py (167 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import time
import utils
def RemoveCommentAndStrip(string):
token = string.find('#')
return string[:token].strip() if token >= 0 else string.strip()
class GenericDistroTests(object):
"""
Tests that uses common linux environment commands and are not specific to any
distribution in particular.
The abstract methods were defined to force distribution-specific tests.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def TestPackageInstallation(self):
"""
Ensure a package can be installed from distro archives (`make` or any other
generic package).
"""
pass
@abc.abstractmethod
def IsPackageInstalled(self, package_name):
"""
Returns True if @package_name is installed on system, otherwise False.
"""
pass
def TestNoIrqbalanceInstalled(self):
"""
Ensure that `irqbalance` is not installed or running.
"""
if self.IsPackageInstalled('irqbalance'):
raise Exception('irqbalance should not be found')
def GetCmdlineConfigs(self):
"""
Return command line configurations to be checked.
"""
return {
'console': ['ttyS0', '38400n8'],
}
def GetCmdlineLocation(self):
"""
Return the path for kernel arguments given by the bootloader
"""
return '/proc/cmdline'
def TestKernelCmdargs(self):
"""
Ensure boot loader configuration for console logging is correct.
Ensure boot loader kernel command line args (per distro).
"""
def ReadCmdline():
cmdline = open(self.GetCmdlineLocation()).read()
# Store values as: { # e.g: "console=ttyS0,38400n8 ro"
# 'console': ['ttyS0', '38400n8'],
# 'ro': [],
# }
configs = {}
args = []
for line in cmdline.split('\n'):
if line:
args.extend(line.split(' '))
for arg in args:
v = arg.split('=')
if len(v) > 1:
configs[v[0]] = [i.replace('"', '').strip() for i in v[1].split(',')]
else:
configs[v[0]] = []
return configs
desired_configs = self.GetCmdlineConfigs()
cur_configs = ReadCmdline()
try:
for desired_config, desired_values in desired_configs.items():
for desired_value in desired_values:
cur_value = cur_configs[desired_config]
if desired_value:
if desired_value not in cur_value:
e = 'Desired cmdline arg "%s" with value "%s" not found in "%s"'
raise Exception(e % (desired_config, desired_value, cur_value))
else:
# empty list
if cur_value:
e = 'Desired cmdline arg "%s" should not be defined as "%s"'
raise Exception(e % (desired_config, cur_value))
except KeyError as e:
raise Exception('Desired cmdline arg "%s" not found' % e.args[0])
def TestHostname(self, expected_hostname):
"""
Ensure hostname gets set to the instance name.
"""
import socket
actual_hostname = socket.gethostname()
if expected_hostname != actual_hostname:
raise Exception('Hostname "%s" differs from expected "%s"' % (
actual_hostname, expected_hostname))
def TestRsyslogConfig(self):
"""
Ensure that rsyslog is installed and configured and that the hostname is
properly set in the logs on boot.
"""
# test if kernel and daemon messages are being logged to console. The
# hostname output will be checked by the step "rsyslog-hostname-test"
info = [
['kern.info', 'RsyslogKernelConsoleTest'],
['daemon.info', 'RsyslogDaemonConsoleTest'],
]
# Avoid log output overload on centos-6
time.sleep(0.1)
for facility in info:
utils.Execute(['logger', '-p'] + facility)
time.sleep(0.1)
def TestRootPasswordDisabled(self):
"""
Ensure root password is disabled (/etc/passwd)
"""
# as 'man shadow' described:
# If the password field contains some string that is not a valid result of
# crypt(3), for instance ! or *, the user will not be able to use a unix
# password to log in
#
# Below, not the most pythonic thing to do... but it's the easiest one
utils.Execute(['grep', '^root:[\\!*]', '/etc/shadow'])
def GetSshdConfig(self):
"""
Return desired sshd config to be checked.
"""
return {
'PermitRootLogin': 'no',
'PasswordAuthentication': 'no',
}
def TestSshdConfig(self):
"""
Ensure sshd config has sane default settings
"""
def ParseSshdConfig(path):
configs = {}
with open(path) as f:
# Avoid log output overload on centos-6
time.sleep(0.1)
for line in filter(RemoveCommentAndStrip, f.read().split('\n')):
if line:
# use line separator for key and # values
entry = line.split(' ')
# strip dictionary value
configs[entry[0]] = ' '.join(entry[1:]).strip()
return configs
actual_sshd_configs = ParseSshdConfig('/etc/ssh/sshd_config')
for desired_key, desired_value in self.GetSshdConfig().items():
if actual_sshd_configs[desired_key] != desired_value:
raise Exception('Sshd key "%s" should be "%s" and not "%s"' % (
desired_key, desired_value, actual_sshd_configs[desired_key]))
@abc.abstractmethod
def TestPackageManagerConfig(self):
"""
Ensure apt/yum repos are setup for GCE repos.
"""
pass
def TestNetworkInterfaceMTU(self):
"""
Ensure that the network interface MTU is set to 1460.
"""
from os import listdir
for interface in listdir('/sys/class/net/'):
if interface == 'lo':
# Loopback is not subject to this restriction
continue
cur_mtu = int(open('/sys/class/net/%s/mtu' % interface).read())
desired_mtu = 1460
if cur_mtu != desired_mtu:
raise Exception('Network MTU is %d but expected %d on %s interface' % (
cur_mtu, desired_mtu, interface))
def TestNTPConfig(self):
"""
Ensure that the NTP server is set to metadata.google.internal.
"""
def CheckNtpRun(cmd):
"""
Run @cmd and check, if successful, whether google server is found on
output.
Args:
cmd: list of strings. Command to be passed to utils.Exceute
Return value:
bool. True if client exists and google server is found. False
otherwise.
"""
try:
rc, out = utils.Execute(cmd, raise_errors=False, capture_output=True)
if rc == 0:
# ntp client found on system
if out.find('metadata.google') >= 0:
# Google server found
return True
except OSError:
# just consider it as a regular error as below
pass
# Command didn't run successfully
return False
# Try ntp
if CheckNtpRun(['ntpq', '-p']):
return
# Try chrony
if CheckNtpRun(['chronyc', 'sources']):
return
raise Exception("No NTP client found that uses Google's NTP server")
@abc.abstractmethod
def TestAutomaticSecurityUpdates(self):
"""
Ensure automatic security updates are enabled per distro specs.
"""
pass
def GetSysctlConfigs(self):
"""
Return linux parameters for sysctl checks.
"""
return {
'net.ipv4.ip_forward': 0,
'net.ipv4.tcp_syncookies': 1,
'net.ipv4.conf.all.accept_source_route': 0,
'net.ipv4.conf.default.accept_source_route': 0,
'net.ipv4.conf.all.accept_redirects': 0,
'net.ipv4.conf.default.accept_redirects': 0,
'net.ipv4.conf.all.secure_redirects': 1,
'net.ipv4.conf.default.secure_redirects': 1,
'net.ipv4.conf.all.send_redirects': 0,
'net.ipv4.conf.default.send_redirects': 0,
'net.ipv4.conf.all.rp_filter': 1,
'net.ipv4.conf.default.rp_filter': 1,
'net.ipv4.icmp_echo_ignore_broadcasts': 1,
'net.ipv4.icmp_ignore_bogus_error_responses': 1,
'net.ipv4.conf.all.log_martians': 1,
'net.ipv4.conf.default.log_martians': 1,
'net.ipv4.tcp_rfc1337': 1,
'kernel.randomize_va_space': 2,
}
def TestSysctlSecurityParams(self):
"""
Ensure sysctl security parameters are set.
"""
def CheckSecurityParameter(key, desired_value):
rc, output = utils.Execute(['sysctl', '-e', key], capture_output=True)
actual_value = int(output.split("=")[1])
if actual_value != desired_value:
raise Exception('Security Parameter %s is %d but expected %d' % (
key, actual_value, desired_value))
sysctl_configs = self.GetSysctlConfigs()
for config in sysctl_configs:
# Avoid log output overload on centos-6
time.sleep(0.1)
CheckSecurityParameter(config, sysctl_configs[config])
def TestGcloudUpToDate(self):
"""
Test for gcloud/gsutil (some distros won't have this) and validate that
versions are up to date.
https://github.com/GoogleCloudPlatform/compute-image-tools/issues/400
"""
# firstly check if gcloud and gsutil are available
try:
rc_gcloud, output = utils.Execute(['gcloud'], raise_errors=False)
rc_gsutil, output = utils.Execute(['gsutil'], raise_errors=False)
except OSError as e:
if e.errno == 2: # No such file or directory
# command is not available, skip this test
return
raise e
# Avoid log output overload on centos-6
time.sleep(1)
# now test if their API are still valid
utils.Execute(['gcloud', 'compute', 'images', 'list'])
# Avoid log output overload on centos-6
time.sleep(1)
utils.Execute(['gsutil', 'ls'])
time.sleep(1)