in gcpdiag/lint/command.py [0:0]
def _parse_args_run_repo(
argv: Optional[List[str]] = None,
credentials: Optional[str] = None) -> lint.LintRuleRepository:
"""Parse the sys.argv command line arguments and execute the lint rules.
Args: argv: [str] argument list sys.argv
credentials: str json repr of ADC credentials
Returns: lint.LintRuleRepository with repo results
"""
# Initialize argument parser
parser = _init_args_parser()
args = parser.parse_args(args=argv)
if credentials:
apis.set_credentials(credentials)
if args.interface == 'cli':
# Allow to change defaults using a hook function.
hooks.set_lint_args_hook(args)
# Initialize configuration
config.init(vars(args), terminal_output.is_cloud_shell())
try:
# Users to use either project Number or project id
# fetch project details
project = crm.get_project(args.project)
except utils.GcpApiError as e:
raise e
else:
# set the project id in config and context as
# remaining code will mainly use project ID
config.set_project_id(project.id)
# Initialize Context.
context = models.Context(project_id=project.id,
locations=args.location,
resources=args.name,
labels=args.label)
# Rules name patterns that shall be included or excluded
include_patterns = _parse_rule_patterns(config.get('include'))
exclude_patterns = _parse_rule_patterns(config.get('exclude'))
# Initialize Repository, and Tests.
repo = lint.LintRuleRepository(
load_extended=config.get('include_extended'),
run_async=config.get('experimental_enable_async_rules'),
exclude=exclude_patterns,
include=include_patterns)
_load_repository_rules(repo)
# ^^^ If you add rules directory, update also
# pyinstaller/hook-gcpdiag.lint.py and bin/precommit-required-files
# Initialize proper output formatter
output_order = sorted(str(r) for r in repo.rules_to_run)
output = _initialize_output(output_order=output_order)
repo.result.add_result_handler(output.result_handler)
# Logging setup.
logging_handler = output.get_logging_handler()
logger = logging.getLogger()
# Make sure we are only using our own handler
logger.handlers = []
logger.addHandler(logging_handler)
if config.get('verbose') >= 2:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# Disable logging from python-api-client, unless verbose is turned on
if config.get('verbose') == 0:
gac_http_logger = logging.getLogger('googleapiclient.http')
gac_http_logger.setLevel(logging.ERROR)
# Deprecation warning
if config.get('auth_oauth'):
logger.error(
'The oauth authentication has been deprecated and does not work'
' anymore. Consider using other authentication methods.')
raise ValueError('oauth authentication is no longer supported')
# Start the reporting
if args.interface == 'cli':
output.display_banner()
output.display_header(context)
# Verify that we have access and that the CRM API is enabled
try:
apis.verify_access(context.project_id)
except (utils.GcpApiError, exceptions.GoogleAuthError, ValueError) as err:
if args.interface == 'api':
logger.error('Access verifications failed for API interface: %s', err)
result = repo.result.create_rule_report(
lint.LintRule(
product='',
rule_class=lint.LintRuleClass.ERR,
rule_id='',
short_desc='Access verification failed',
long_desc='Access verification failed',
keywords=[],
))
result.add_skipped(None, f'API Error: {err}', None)
result.finish()
return repo
else:
raise err
# Warn end user to fallback on serial logs buffer if project isn't storing in
# cloud logging
if args.interface == 'cli':
if not gce.is_project_serial_port_logging_enabled(context.project_id) and \
not config.get('enable_gce_serial_buffer'):
# Only print the warning if GCE is enabled in the first place
if apis.is_enabled(context.project_id, 'compute'):
logger.warning(
'''Serial output to cloud logging maybe disabled for certain GCE instances.
Fallback on serial output buffers by using flag --enable-gce-serial-buffer \n'''
)
# Run the tests.
repo.run_rules(context)
if args.interface == 'cli':
output.display_footer(repo.result)
hooks.post_lint_hook(repo.result.get_rule_statuses())
if credentials:
apis.set_credentials(None)
# Clean up the kubeconfig file generated for gcpdiag
kubectl.clean_up()
return repo