gcpdiag/runbook/gce/guestos_bootup.py (161 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Guest OS boot-up runbook."""
import mimetypes
import googleapiclient.errors
from gcpdiag import runbook
from gcpdiag.queries import crm, gce
from gcpdiag.runbook import op
from gcpdiag.runbook.gce import constants as gce_const
from gcpdiag.runbook.gce import flags
from gcpdiag.runbook.gce import generalized_steps as gce_gs
class GuestosBootup(runbook.DiagnosticTree):
""" Google Compute Engine VM Guest OS boot-up runbook.
This runbook is designed to investigate the various boot-up stages of a Linux or Windows Guest
OS running on Google Compute Engine. It is intended to help you identify and troubleshoot issues
that may arise during the boot process. The runbook provides a structured approach to resolve
issues.
Key Investigation Areas:
Boot Issues:
- Check for Boot issues happening due to Kernel panics
- Check for GRUB related issues.
- Check if system failed to find boot disk.
- Check if Filesystem corruption is causing issues with system boot.
- Check if "/" Filesystem consumption is causing issues with system boot.
Cloud-init checks:
- Check if cloud-init has initialised or started.
- Check if NIC has received the IP.
Network related issues:
- Check if metadata server became unreachable since last boot.
- Check if there are any time sync related errors.
Google Guest Agent checks:
- Check if there are logs related to successful startup of Google Guest Agent.
"""
parameters = {
flags.PROJECT_ID: {
'type': str,
'help': 'The Project ID associated with the VM',
'required': True
},
flags.INSTANCE_NAME: {
'type': str,
'help': 'The name of the VM',
'required': True
},
flags.INSTANCE_ID: {
'type': str,
'help': 'The instance-id of the VM'
},
flags.ZONE: {
'type': str,
'help': 'The Google Cloud zone where the VM is located.',
'required': True
},
flags.SERIAL_CONSOLE_FILE: {
'type': str,
'ignorecase': True,
'help': 'Absolute path of files contailing the Serial console logs,'
' in case if gcpdiag is not able to reach the VM Serial logs.'
' i.e -p serial_console_file="filepath1,filepath2" ',
}
}
def build_tree(self):
"""Construct the diagnostic tree with appropriate steps."""
start = GuestosBootupStart()
self.add_start(start)
# consider leverage LLM to perform anomaly detection
# or other advanced analysis on serial logs within the VM
# Check for Boot related issues
kernel_panic = gce_gs.VmSerialLogsCheck()
kernel_panic.project_id = op.get(flags.PROJECT_ID)
kernel_panic.zone = op.get(flags.ZONE)
kernel_panic.instance_name = op.get(flags.INSTANCE_NAME)
kernel_panic.serial_console_file = op.get(flags.SERIAL_CONSOLE_FILE)
kernel_panic.template = 'vm_serial_log::kernel_panic'
kernel_panic.negative_pattern = gce_const.KERNEL_PANIC_LOGS
self.add_step(parent=start, child=kernel_panic)
# Checking for Filesystem corruption related errors
fs_corruption = gce_gs.VmSerialLogsCheck()
fs_corruption.project_id = op.get(flags.PROJECT_ID)
fs_corruption.zone = op.get(flags.ZONE)
fs_corruption.instance_name = op.get(flags.INSTANCE_NAME)
fs_corruption.serial_console_file = op.get(flags.SERIAL_CONSOLE_FILE)
fs_corruption.template = 'vm_serial_log::linux_fs_corruption'
fs_corruption.negative_pattern = gce_const.FS_CORRUPTION_MSG
self.add_step(parent=start, child=fs_corruption)
#Checking for Cloud-init related issues
cloudinit_issues = CloudInitChecks()
self.add_step(parent=start, child=cloudinit_issues)
# Checking for network related errors
network_issue = gce_gs.VmSerialLogsCheck()
network_issue.project_id = op.get(flags.PROJECT_ID)
network_issue.zone = op.get(flags.ZONE)
network_issue.instance_name = op.get(flags.INSTANCE_NAME)
network_issue.serial_console_file = op.get(flags.SERIAL_CONSOLE_FILE)
network_issue.template = 'vm_serial_log::network_errors'
network_issue.negative_pattern = gce_const.NETWORK_ERRORS
self.add_step(parent=start, child=network_issue)
# Check for Guest Agent status
guest_agent_check = gce_gs.VmSerialLogsCheck()
guest_agent_check.project_id = op.get(flags.PROJECT_ID)
guest_agent_check.zone = op.get(flags.ZONE)
guest_agent_check.instance_name = op.get(flags.INSTANCE_NAME)
guest_agent_check.serial_console_file = op.get(flags.SERIAL_CONSOLE_FILE)
guest_agent_check.template = 'vm_serial_log::guest_agent'
guest_agent_check.positive_pattern = gce_const.GUEST_AGENT_STATUS_MSG
guest_agent_check.negative_pattern = gce_const.GUEST_AGENT_FAILED_MSG
self.add_step(parent=start, child=guest_agent_check)
self.add_end(runbook.EndStep())
class GuestosBootupStart(runbook.StartStep):
"""Fetches VM details and validates the instance state.
This step retrieves the VM instance details based on the provided project ID,
zone, and instance name. It checks if the VM is running and updates the
instance ID or name if missing. Additionally, it performs sanity checks on
the provided serial console log files to ensure they are valid plain text files.
"""
template = 'vm_attributes::running'
def execute(self):
"""Fetching VM details"""
project = crm.get_project(op.get(flags.PROJECT_ID))
try:
vm = gce.get_instance(project_id=op.get(flags.PROJECT_ID),
zone=op.get(flags.ZONE),
instance_name=op.get(flags.INSTANCE_NAME))
except googleapiclient.errors.HttpError:
op.add_skipped(
project,
reason=('Instance {} does not exist in zone {} or project {}').format(
op.get(flags.INSTANCE_NAME), op.get(flags.ZONE),
op.get(flags.PROJECT_ID)))
else:
if vm and vm.is_running:
# Check for instance id and instance name
if not op.get(flags.INSTANCE_ID):
op.put(flags.INSTANCE_ID, vm.id)
elif not op.get(flags.INSTANCE_NAME):
op.put(flags.INSTANCE_NAME, vm.name)
else:
op.add_failed(vm,
reason=op.prep_msg(op.FAILURE_REASON,
full_resource_path=vm.full_path,
status=vm.status),
remediation=op.prep_msg(op.FAILURE_REMEDIATION,
full_resource_path=vm.full_path,
status=vm.status))
# file sanity checks
if op.get(flags.SERIAL_CONSOLE_FILE):
for file in op.get(flags.SERIAL_CONSOLE_FILE).split(','):
try:
with open(file, 'rb') as f:
results = mimetypes.guess_type(file)[0]
if results and not results.startswith('text/'):
# Peek at content for further clues
content_start = f.read(1024) # Read a small chunk
# Check for gzip and xz magic number (first two bytes)
if content_start.startswith(
b'\x1f\x8b') or content_start.startswith(b'\xfd'):
op.add_skipped(
vm,
reason=('File {} appears to be compressed, not plain text.'
).format(file))
else:
# If not gzip or tar, try simple text encoding detection (UTF-8, etc.)
try:
content_start.decode()
except UnicodeDecodeError:
op.add_skipped(
vm,
reason=('File {} does not appear to be plain text.'
).format(file))
except FileNotFoundError:
op.add_skipped(
vm,
reason=('The file {} does not exists. Please verify if '
'you have provided the correct absolute file path'
).format(file))
class CloudInitChecks(runbook.CompositeStep):
"""Cloud init related checks"""
def execute(self):
"""Cloud init related checks"""
ubuntu_licenses = gce.get_gce_public_licences('ubuntu-os-cloud')
ubuntu_pro_licenses = gce.get_gce_public_licences('ubuntu-os-pro-cloud')
licenses = ubuntu_licenses + ubuntu_pro_licenses
vm = gce.get_instance(project_id=op.get(flags.PROJECT_ID),
zone=op.get(flags.ZONE),
instance_name=op.get(flags.INSTANCE_NAME))
if vm.check_license(licenses):
# Checking for Cloud init startup log
cloud_init_startup_check = gce_gs.VmSerialLogsCheck()
cloud_init_startup_check.project_id = op.get(flags.PROJECT_ID)
cloud_init_startup_check.zone = op.get(flags.ZONE)
cloud_init_startup_check.instance_name = op.get(flags.INSTANCE_NAME)
cloud_init_startup_check.serial_console_file = op.get(
flags.SERIAL_CONSOLE_FILE)
cloud_init_startup_check.template = 'vm_serial_log::cloud_init_startup_check'
cloud_init_startup_check.positive_pattern = gce_const.CLOUD_INIT_STARTUP_PATTERN
self.add_child(cloud_init_startup_check)
# Checking if NIC has received IP
cloud_init_check = gce_gs.VmSerialLogsCheck()
cloud_init_check.template = 'vm_serial_log::cloud_init'
cloud_init_check.project_id = op.get(flags.PROJECT_ID)
cloud_init_check.zone = op.get(flags.ZONE)
cloud_init_check.instance_name = op.get(flags.INSTANCE_NAME)
cloud_init_check.serial_console_file = op.get(flags.SERIAL_CONSOLE_FILE)
cloud_init_check.negative_pattern = gce_const.CLOUD_INIT_NEGATIVE_PATTERN
cloud_init_check.positive_pattern = gce_const.CLOUD_INIT_POSITIVE_PATTERN
self.add_child(cloud_init_check)
else:
op.add_skipped(
vm, reason='This VM is not Ubuntu or it does not uses cloud-init')