cli/node_config_fetcher.py (127 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """For a given node, fetches the current configuration settings.""" import asyncio import subprocess from typing import Callable import click import config as configcheck_config import dependency_version_parser class NodeConfigFetcher: """Fetches the current configuration settings for a given node.""" def __init__( self, name: str, project: str, dependency_parsers: list[ dependency_version_parser.DependencyVersionParser ], zone: str, run_async: bool = False, sudo: bool = False, verbose: bool = False, ): """Creates a Config Fetcher for a given node. Args: name: The name of the node. project: The project of the node. dependency_parsers: A list of dependency parsers for the node. zone: The zone of the node. run_async: If true, run the fetcher in an async mode. sudo: Whether to run remote commands with sudo. Defaults to False. verbose: Whether to enable verbose logging. Defaults to False. """ self.name = name self.zone = zone self.project = project self.dependency_parsers = dependency_parsers self.run_async = run_async self.sudo = sudo self.verbose = verbose def _get_remote_exec_cmd( self, cmd: list[str], sudo: bool = False ) -> list[str]: """Fetches the command result from the remote node.""" remote_cmd = [ 'gcloud', 'compute', 'ssh', '--zone', f'{self.zone}', '--project', f'{self.project}', f'{self.name}', '--tunnel-through-iap', '--', ] if sudo: remote_cmd.append('sudo') remote_cmd.extend(cmd) return remote_cmd def _run_cmd_async(self, cmd: str): """Runs a bash command and returns the output.""" process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, ) stdout, stderr = process.communicate() if process.returncode != 0: raise subprocess.CalledProcessError( process.returncode, cmd, stderr, stdout ) return stdout.decode('utf-8') def _run_cmd(self, cmd: str) -> str: """Runs a bash command and returns the output.""" result = subprocess.run( cmd, shell=True, check=True, capture_output=True, text=True, ) if result.returncode != 0: raise subprocess.CalledProcessError( result.returncode, cmd, result.stderr, result.stdout ) return result.stdout async def _fetch_config_internal( self, cmd_runner: Callable[[str], str], ) -> configcheck_config.NodeConfig: """Internal impl to fetch a given node configuration. Args: cmd_runner: A function to run a bash command and return the output. Returns: The node configuration. """ node_config = configcheck_config.NodeConfig(name=self.name) for dependency_fetcher in self.dependency_parsers: if dependency_fetcher.local_exec: cmd = dependency_fetcher.cmd else: cmd = self._get_remote_exec_cmd( cmd=dependency_fetcher.cmd, sudo=self.sudo ) cmd = ' '.join(cmd) if self.verbose: click.echo( _format_cmd_for_logging( dependency_name=dependency_fetcher.name, cmd=cmd ) ) try: cmd_output = await asyncio.to_thread(lambda cmd=cmd: cmd_runner(cmd)) dependency_config = dependency_fetcher.parse_version(cmd_output) except subprocess.CalledProcessError: dependency_config = configcheck_config.DependencyConfig( name=dependency_fetcher.name, version='Error Fetching Dependency', ) node_config.dependencies[dependency_fetcher.name] = dependency_config return node_config async def fetch_config_async(self) -> configcheck_config.NodeConfig: """Asynchronously fetches the current configuration settings for a given node.""" return await self._fetch_config_internal(self._run_cmd_async) def fetch_config(self) -> configcheck_config.NodeConfig: """Fetches the current configuration settings for a given node.""" return asyncio.run(self._fetch_config_internal(self._run_cmd)) def _format_cmd_for_logging( dependency_name: str, cmd: str, color: str = 'yellow' ) -> str: """Formats a command for logging.""" return ( click.style( 'Fetching dependency ', fg=color, ) + click.style( dependency_name, fg=color, bold=True, ) + click.style( ' with command: ', fg=color, ) + click.style( cmd, fg=color, bold=True, ) )