parts/linux/cloud-init/artifacts/cse_redact_cloud_config.py (46 lines of code) (raw):
import yaml
import argparse
# String value used to replace secret data
REDACTED = 'REDACTED'
# Redact functions
def redact_bootstrap_kubeconfig_tls_token(bootstrap_kubeconfig_write_file):
content_yaml = yaml.safe_load(bootstrap_kubeconfig_write_file['content'])
content_yaml['users'][0]['user']['token'] = REDACTED
bootstrap_kubeconfig_write_file['content'] = yaml.dump(content_yaml)
def redact_service_principal_secret(sp_secret_write_file):
sp_secret_write_file['content'] = REDACTED
# Maps write_file's path to the corresponding function used to redact it within cloud-config.txt
# This script will always redact these write_files if they exist within the specified cloud-config.txt
PATH_TO_REDACT_FUNC = {
'/var/lib/kubelet/bootstrap-kubeconfig': redact_bootstrap_kubeconfig_tls_token,
'/etc/kubernetes/sp.txt': redact_service_principal_secret
}
def redact_cloud_config(cloud_config_path, output_path):
target_paths = set(PATH_TO_REDACT_FUNC.keys())
with open(cloud_config_path, 'r') as f:
cloud_config_data = f.read()
cloud_config = yaml.safe_load(cloud_config_data)
for write_file in cloud_config['write_files']:
if write_file['path'] in target_paths:
target_path = write_file['path']
target_paths.remove(target_path)
print('Redacting secrets from write_file: ' + target_path)
PATH_TO_REDACT_FUNC[target_path](write_file)
if len(target_paths) == 0:
break
print('Dumping redacted cloud-config to: ' + output_path)
with open(output_path, 'w+') as output_file:
output_file.write(yaml.dump(cloud_config))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Command line utility used to redact secrets from write_file definitions for ' +
str([", ".join(PATH_TO_REDACT_FUNC)]) + ' within a specified cloud-config.txt. \
These secrets must be redacted before cloud-config.txt can be collected for logging.')
parser.add_argument(
"--cloud-config-path",
required=True,
type=str,
help='Path to cloud-config.txt to redact')
parser.add_argument(
"--output-path",
required=True,
type=str,
help='Path to the newly generated cloud-config.txt with redacted secrets')
args = parser.parse_args()
redact_cloud_config(args.cloud_config_path, args.output_path)