processors/shellscript.py (89 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .base import Processor, NotConfiguredException import json import yaml import os import subprocess from io import StringIO import csv class CommandFailedException(Exception): pass class ShellscriptProcessor(Processor): """ Runs any shellscript as a command and exposes the output in Jinja. Args: command (str): Command to execute. args (list, optional): List of arguments. environment (dict, optional): Additional environment variables to set. stdin (str, optional): Contents to pass via stdin to the process. json (bool, optional): Interpret the output as JSON. jsonMultiline (bool, optional): Interpret the output as multiline JSON. yaml (bool, optional): Interpret the output as YAML. csv (bool, optional): Interpret the output as CSV. tsv (bool, optional): Interpret the output as TSV. exitcodes (list, optional): List of allowed exit codes that are interpreted as successful run. """ def get_default_config_key(): return 'shellscript' def process(self, output_var='shellscript'): shell_config = self.config if 'command' not in shell_config: raise NotConfiguredException('No executable command found!') if 'output' in shell_config: output_var = self._jinja_expand_string(shell_config['output'], 'output') command = self._jinja_expand_string(shell_config['command'], 'command') full_args = [command] if 'args' in shell_config: full_args.extend( self._jinja_var_to_list_all(shell_config['args'], 'args')) environment = {} if 'environment' in shell_config: environment = self._jinja_expand_dict(shell_config['environment'], 'env') stdin = None if 'stdin' in shell_config: stdin = self._jinja_expand_string(shell_config['stdin'], 'stdin') self.logger.info('Running shell script: %s' % (command), extra={'arguments': full_args[1:]}) result = subprocess.run( full_args, capture_output=True, input=stdin, text=True, env={ **os.environ, **environment }, ) if ('exitcodes' in shell_config and result.returncode not in shell_config['exitcodes']) or ( 'exitcodes' not in shell_config and result.returncode != 0): raise CommandFailedException( 'Command %s failed with return code: %d, stderr=%s' % (command, result.returncode, result.stderr)) data = None if 'jsonMultiline' in shell_config and shell_config['jsonMultiline']: data = [] for result_line in result.stdout.split("\n"): if result_line != "": try: line_data = json.loads(result_line) except Exception as e: self.logger.error( 'Error parsing multiline JSON from stdout: %s' % (result_line)) raise e data.append(line_data) elif 'json' in shell_config and shell_config['json']: try: data = json.loads(result.stdout) except Exception as e: self.logger.error('Error parsing JSON from stdout: %s' % (result.stdout)) raise e elif 'yaml' in shell_config and shell_config['yaml']: data = yaml.load(result.stdout, Loader=yaml.SafeLoader) elif ('csv' in shell_config and shell_config['csv']) or ('tsv' in shell_config and shell_config['tsv']): sf = StringIO(result.stdout) reader = csv.reader(sf, delimiter=',' if 'csv' in shell_config and shell_config['csv'] else "\t") data = [] for row in reader: data.append(row) ret = {} ret[output_var] = { 'parsed': data, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode, } return ret