in services/jenkins-master/scripts/jenkins_config_templating.py [0:0]
def execute_config_templating(varfile, secretsdir, jenkinsdir, mode, update_secrets):
"""
Execute config templating that inserts or removes secrets from a jenkins configuration directory
:param varfile: File containing the actual variables that should be used during replacement
:param secretsdir: Directory containing all files that should be just copied or removed as secrets
:param jenkinsdir: Jenkins configuration directory
:param mode: 'insert' replaces placeholders with actual values. 'remove' removes these values and
inserts placeholders
:param update_secrets: Update secrets if mode == 'remove' and actual secrets differ from the stored ones
:return:
"""
secret_entries = read_secret_entires(varfile)
logging.debug('Found {} secret entries to be replaced'.format(len(secret_entries)))
# Prepare by finding all unique identifiers. This is required because XML parsers do not allow in-place
# replacements. Instead, we're reading a unique identifier under the specified xpath and then try to
# replace it in-place by using search-and-replace. This method will be aborted if the current value
# has been found multiple times within the same file.
for root, dirs, files in os.walk(secretsdir, topdown=False):
for name in files:
original_path = os.path.join(root, name)
rel_path = os.path.relpath(original_path, secretsdir)
temp_path = os.path.join(jenkinsdir, rel_path)
if mode == 'insert':
pathlib.Path(os.path.dirname(temp_path)).mkdir(parents=True, exist_ok=True)
shutil.copyfile(original_path, temp_path)
elif mode == 'remove':
# Check if secret does not exist anymore
if not os.path.isfile(temp_path):
if update_secrets:
logging.info('Deleting secret {} because it has been removed on target'.format(rel_path))
os.remove(original_path)
else:
raise ValueError('Secret {} has been deleted'.format(rel_path))
# Check if secrets are the same or have to be updated
if not filecmp.cmp(temp_path, original_path):
if update_secrets:
logging.info('Replacing secret {} due to changed content'.format(rel_path))
shutil.copyfile(temp_path, original_path)
else:
raise ValueError('Secret {} contains changed content'.format(rel_path))
os.remove(temp_path)
else:
raise ValueError('Mode {} unknown'.format(mode)) # TODO check this previously
# Check if any files in the secrets-dir are left that didn't exist in the previous config. Unfortunately,
# we can't verify if secrets outside the secrets-dir have been added.
if mode == 'remove':
temp_secrets_dir = os.path.join(jenkinsdir, 'secrets')
for root, dirs, files in os.walk(temp_secrets_dir, topdown=False):
for name in files:
temp_path = os.path.join(root, name)
rel_path = os.path.relpath(temp_path, jenkinsdir)
original_path = os.path.join(secretsdir, rel_path)
if update_secrets:
logging.info('Adding new secret at {}'.format(rel_path))
shutil.copyfile(temp_path, original_path)
else:
raise ValueError('New secret at {}'.format(rel_path))
shutil.rmtree(temp_secrets_dir)
for secret_entry in secret_entries:
temp_file_path = os.path.join(jenkinsdir, secret_entry.filepath)
if os.path.isfile(temp_file_path):
element = etree.parse(temp_file_path).xpath(secret_entry.xpath)
# Check if xpath delivers multiple results. The xpath should only match once
if len(element) == 1:
current_value = element[0].text
if not current_value.strip():
raise ValueError('Element {} at {}:{} is not a text field'.
format(current_value, temp_file_path, secret_entry.xpath))
if mode == 'insert':
expected_value = secret_entry.placeholder
target_value = secret_entry.secret
elif mode == 'remove':
expected_value = secret_entry.secret
target_value = secret_entry.placeholder
else:
raise ValueError('Mode {} unknown'.format(mode))
if current_value == expected_value:
logging.debug(
'Replacing {} with {} at {}:{}'.format(current_value, target_value, secret_entry.xpath,
temp_file_path))
_replace_values(current_value, target_value, temp_file_path)
elif current_value == target_value:
logging.info('Target value {} already present. Skipping {}:{}'.
format(target_value, secret_entry.xpath, temp_file_path))
continue
else:
raise ValueError('Current value "{}" does not match expected value "{}" in {}:{}'.format(
current_value, expected_value, secret_entry.xpath))
elif len(element) == 0:
raise ValueError('Element at {}:{} not found'.format(temp_file_path, secret_entry.xpath))
else:
raise ValueError('1 Element expected, {} found at {}:{}'.
format(len(element), temp_file_path, secret_entry.xpath))
else:
raise FileNotFoundError('Could not find file {}'.format(temp_file_path))