azdev/operations/secret.py (274 lines of code) (raw):

# ----------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ----------------------------------------------------------------------------- import os import json from json.decoder import JSONDecodeError from knack.log import get_logger from microsoft_security_utilities_secret_masker import (load_regex_patterns_from_json_file, load_regex_pattern_from_json, SecretMasker) logger = get_logger(__name__) def _validate_data_path(file_path=None, directory_path=None, include_pattern=None, exclude_pattern=None, data=None): if file_path and directory_path: raise ValueError('Can not specify file path and directory path at the same time') if file_path and data: raise ValueError('Can not specify file path and raw string at the same time') if directory_path and data: raise ValueError('Can not specify directory path and raw string at the same time') if not file_path and not directory_path and not data: raise ValueError('No file path or directory path or raw string provided') if directory_path and not os.path.isdir(directory_path): raise ValueError(f'invalid directory path:{directory_path}') if file_path and not os.path.isfile(file_path): raise ValueError(f'invalid file path:{file_path}') if not directory_path and include_pattern: raise ValueError('--include-pattern need to be used together with --directory-path') if not directory_path and exclude_pattern: raise ValueError('--exclude-pattern need to be used together with --directory-path') if include_pattern and exclude_pattern: raise ValueError('--include-pattern and --exclude-pattern are mutually exclusive') def _is_file_name_in_patterns(filename, patterns): if not filename or not patterns: return None import fnmatch for pattern in patterns: if fnmatch.fnmatch(filename, pattern): return True return False def _check_file_include_and_exclude_pattern(filename, include_pattern=None, exclude_pattern=None): file_satisfied = True if include_pattern and not _is_file_name_in_patterns(filename, include_pattern): file_satisfied = False if exclude_pattern and _is_file_name_in_patterns(filename, exclude_pattern): file_satisfied = False return file_satisfied def _get_files_from_directory(directory_path, recursive=None, include_pattern=None, exclude_pattern=None): target_files = [] if recursive: for root, _, files in os.walk(directory_path): for file in files: if _check_file_include_and_exclude_pattern(file, include_pattern=include_pattern, exclude_pattern=exclude_pattern): target_files.append(os.path.join(root, file)) else: for file in os.listdir(directory_path): if _check_file_include_and_exclude_pattern(file, include_pattern=include_pattern, exclude_pattern=exclude_pattern): file = os.path.join(directory_path, file) if os.path.isfile(file): target_files.append(file) return target_files def _load_built_in_regex_patterns(confidence_level=None): if not confidence_level: confidence_level = 'HIGH' patterns = set() if confidence_level in ['HIGH', 'MEDIUM', 'LOW']: patterns.update(load_regex_patterns_from_json_file('HighConfidenceSecurityModels.json')) if confidence_level in ['MEDIUM', 'LOW']: patterns.update(load_regex_patterns_from_json_file('MediumConfidenceSecurityModels.json')) if confidence_level == 'LOW': patterns.update(load_regex_patterns_from_json_file('LowConfidenceSecurityModels.json')) return patterns def _load_regex_patterns(confidence_level=None, custom_pattern=None): built_in_regex_patterns = _load_built_in_regex_patterns(confidence_level) if not custom_pattern: return built_in_regex_patterns try: if os.path.isfile(custom_pattern): with open(custom_pattern, 'r', encoding='utf8') as f: custom_pattern = json.load(f) else: custom_pattern = json.loads(custom_pattern) except JSONDecodeError as err: raise ValueError(f'Custom pattern should be in valid json format, err:{err.msg}') regex_patterns = [] if 'Include' in custom_pattern: for pattern in custom_pattern['Include']: if not pattern.get('Pattern', None): raise ValueError(f'Invalid Custom Pattern: {pattern}, ' f'"Pattern" property is required for Include patterns') regex_patterns.append(load_regex_pattern_from_json(pattern)) if "Exclude" in custom_pattern: exclude_pattern_ids = [] for pattern in custom_pattern['Exclude']: if not pattern.get('Id', None): raise ValueError(f'Invalid Custom Pattern: {pattern}, "Id" property is required for Exclude patterns') exclude_pattern_ids.append(pattern['Id']) for pattern in built_in_regex_patterns: if pattern.id in exclude_pattern_ids: continue regex_patterns.append(pattern) else: regex_patterns.extend(built_in_regex_patterns) return regex_patterns def _scan_secrets_for_string(data, confidence_level=None, custom_pattern=None): if not data: return None regex_patterns = _load_regex_patterns(confidence_level, custom_pattern) secret_masker = SecretMasker(regex_patterns) detected_secrets = secret_masker.detect_secrets(data) secrets = [] for secret in detected_secrets: secrets.append({ 'secret_name': secret.name, 'secret_value': data[secret.start:secret.end], 'secret_index': [secret.start, secret.end], 'redaction_token': secret.redaction_token, }) return secrets def scan_secrets(file_path=None, directory_path=None, recursive=False, include_pattern=None, exclude_pattern=None, data=None, save_scan_result=None, scan_result_path=None, confidence_level=None, custom_pattern=None, continue_on_failure=None): _validate_data_path(file_path=file_path, directory_path=directory_path, include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data) target_files = [] scan_results = {} if directory_path: directory_path = os.path.abspath(directory_path) target_files = _get_files_from_directory(directory_path, recursive=recursive, include_pattern=include_pattern, exclude_pattern=exclude_pattern) if file_path: file_path = os.path.abspath(file_path) target_files.append(file_path) if data: secrets = _scan_secrets_for_string(data, confidence_level, custom_pattern) if secrets: scan_results['raw_data'] = secrets elif target_files: for target_file in target_files: try: logger.debug('start scanning secrets for %s', target_file) with open(target_file, encoding='utf8') as f: data = f.read() if not data: continue secrets = _scan_secrets_for_string(data, confidence_level, custom_pattern) logger.debug('%d secrets found for %s', len(secrets), target_file) if secrets: scan_results[target_file] = secrets except Exception as ex: # pylint: disable=broad-exception-caught if continue_on_failure: logger.warning("Error handling file %s, exception %s", target_file, str(ex)) else: raise ex if scan_result_path: save_scan_result = True if not save_scan_result: return { 'secrets_detected': bool(scan_results), 'scan_results': scan_results } if not scan_results: return {'secrets_detected': False, 'scan_result_path': None} if not scan_result_path: from azdev.utilities.config import get_azdev_config_dir from datetime import datetime file_folder = os.path.join(get_azdev_config_dir(), 'scan_results') if not os.path.exists(file_folder): os.mkdir(file_folder, 0o755) result_file_name = 'scan_result_' + datetime.now().strftime('%Y%m%d%H%M%S') + '.json' scan_result_path = os.path.join(file_folder, result_file_name) with open(scan_result_path, 'w', encoding='utf8') as f: json.dump(scan_results, f) logger.debug('store scanning results in %s', scan_result_path) return {'secrets_detected': True, 'scan_result_path': os.path.abspath(scan_result_path)} def _get_scan_results_from_saved_file(saved_scan_result_path, file_path=None, directory_path=None, recursive=False, include_pattern=None, exclude_pattern=None, data=None): scan_results = {} if not os.path.isfile(saved_scan_result_path): raise ValueError(f'invalid saved scan result path:{saved_scan_result_path}') with open(saved_scan_result_path, encoding='utf8') as f: saved_scan_results = json.load(f) # filter saved scan results to keep those related with specified file(s) _validate_data_path(file_path=file_path, directory_path=directory_path, include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data) if file_path: file_path = os.path.abspath(file_path) if file_path in saved_scan_results: scan_results[file_path] = saved_scan_results[file_path] elif directory_path: directory_path = os.path.abspath(directory_path) target_files = _get_files_from_directory(directory_path, recursive=recursive, include_pattern=include_pattern, exclude_pattern=exclude_pattern) for target_file in target_files: if target_file in saved_scan_results: scan_results[target_file] = saved_scan_results[target_file] else: scan_results['raw_data'] = saved_scan_results['raw_data'] return scan_results def _mask_secret_for_string(data, secret, redaction_type=None): if redaction_type == 'FIXED_VALUE': data = data.replace(secret['secret_value'], '***') elif redaction_type == 'FIXED_LENGTH': data = data.replace(secret['secret_value'], '*' * len(secret['secret_value'])) elif redaction_type == 'SECRET_NAME': data = data.replace(secret['secret_value'], secret['secret_name']) else: data = data.replace(secret['secret_value'], secret['redaction_token']) return data def mask_secrets(file_path=None, directory_path=None, recursive=False, include_pattern=None, exclude_pattern=None, data=None, save_scan_result=None, scan_result_path=None, confidence_level=None, custom_pattern=None, continue_on_failure=None, saved_scan_result_path=None, redaction_type='FIXED_VALUE', yes=None): scan_results = {} if saved_scan_result_path: scan_results = _get_scan_results_from_saved_file(saved_scan_result_path, file_path=file_path, directory_path=directory_path, recursive=recursive, include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data) else: scan_response = scan_secrets(file_path=file_path, directory_path=directory_path, recursive=recursive, include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data, save_scan_result=save_scan_result, scan_result_path=scan_result_path, confidence_level=confidence_level, custom_pattern=custom_pattern, continue_on_failure=continue_on_failure) if save_scan_result and scan_response['scan_result_path']: with open(scan_response['scan_result_path'], encoding='utf8') as f: scan_results = json.load(f) elif not save_scan_result: scan_results = scan_response['scan_results'] mask_result = { 'mask': False, 'data': data, 'file_path': file_path, 'directory_path': directory_path, 'recursive': recursive } if not scan_results: logger.warning('No secrets detected, finish directly.') return mask_result for scan_file_path, secrets in scan_results.items(): logger.warning('Will mask %d secrets for %s', len(secrets), scan_file_path) if not yes: from knack.prompting import prompt_y_n if not prompt_y_n(f'Do you want to continue with redaction type {redaction_type}?'): return mask_result if 'raw_data' in scan_results: for secret in scan_results['raw_data']: data = _mask_secret_for_string(data, secret, redaction_type) mask_result['mask'] = True mask_result['data'] = data return mask_result for scan_file_path, secrets in scan_results.items(): try: with open(scan_file_path, 'r', encoding='utf8') as f: content = f.read() if not content: continue for secret in secrets: content = _mask_secret_for_string(content, secret, redaction_type) with open(scan_file_path, 'w', encoding='utf8') as f: f.write(content) except Exception as ex: # pylint: disable=broad-exception-caught if continue_on_failure: logger.warning("Error handling file %s, exception %s", scan_file_path, str(ex)) else: raise ex mask_result['mask'] = True return mask_result