services/jenkins-master/scripts/jenkins_config_templating.py (163 lines of code) (raw):
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -*- coding: utf-8 -*-
# This script serves the purpose to replace sensitive parts in the jenkins configuration with placeholders in order
# to allow the configuration to be published to a public repository
import argparse
import filecmp
import glob
import json
import logging
import os
import pathlib
import shutil
from collections import namedtuple
from lxml import etree
SECRET_ENTRY_KEYS = ['filepath', 'xpath', 'secret', 'placeholder']
SecretEntry = namedtuple('SecretEntry', SECRET_ENTRY_KEYS)
SYMLINK_ENTRY_KEYS = ['filepath', 'is_dir']
SymlinkEntry = namedtuple('SymlinkEntry', SYMLINK_ENTRY_KEYS)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-vf', '--varfile',
help='Location of the variable file',
type=str)
parser.add_argument('-sf', '--symlinkfile',
help='Location of the symlink file',
type=str)
parser.add_argument('-sd', '--secretsdir',
help='Location of the directory containing secrets',
type=str)
parser.add_argument('-jd', '--jenkinsdir',
help='Location of the jenkins directory',
type=str)
parser.add_argument('-m', '--mode',
help='"remove" or "insert" credentials',
default='insert',
type=str)
args = parser.parse_args()
execute_config_templating(args.varfile, args.secretsdir, args.jenkinsdir, args.mode, update_secrets=False)
# TODO: Add symlink list creation
def execute_config_templating(varfile, secretsdir, jenkinsdir, mode, update_secrets):
"""
Execute config templating that inserts or removes secrets from a jenkins configuration directory
:param varfile: File containing the actual variables that should be used during replacement
:param secretsdir: Directory containing all files that should be just copied or removed as secrets
:param jenkinsdir: Jenkins configuration directory
:param mode: 'insert' replaces placeholders with actual values. 'remove' removes these values and
inserts placeholders
:param update_secrets: Update secrets if mode == 'remove' and actual secrets differ from the stored ones
:return:
"""
secret_entries = read_secret_entires(varfile)
logging.debug('Found {} secret entries to be replaced'.format(len(secret_entries)))
# Prepare by finding all unique identifiers. This is required because XML parsers do not allow in-place
# replacements. Instead, we're reading a unique identifier under the specified xpath and then try to
# replace it in-place by using search-and-replace. This method will be aborted if the current value
# has been found multiple times within the same file.
for root, dirs, files in os.walk(secretsdir, topdown=False):
for name in files:
original_path = os.path.join(root, name)
rel_path = os.path.relpath(original_path, secretsdir)
temp_path = os.path.join(jenkinsdir, rel_path)
if mode == 'insert':
pathlib.Path(os.path.dirname(temp_path)).mkdir(parents=True, exist_ok=True)
shutil.copyfile(original_path, temp_path)
elif mode == 'remove':
# Check if secret does not exist anymore
if not os.path.isfile(temp_path):
if update_secrets:
logging.info('Deleting secret {} because it has been removed on target'.format(rel_path))
os.remove(original_path)
else:
raise ValueError('Secret {} has been deleted'.format(rel_path))
# Check if secrets are the same or have to be updated
if not filecmp.cmp(temp_path, original_path):
if update_secrets:
logging.info('Replacing secret {} due to changed content'.format(rel_path))
shutil.copyfile(temp_path, original_path)
else:
raise ValueError('Secret {} contains changed content'.format(rel_path))
os.remove(temp_path)
else:
raise ValueError('Mode {} unknown'.format(mode)) # TODO check this previously
# Check if any files in the secrets-dir are left that didn't exist in the previous config. Unfortunately,
# we can't verify if secrets outside the secrets-dir have been added.
if mode == 'remove':
temp_secrets_dir = os.path.join(jenkinsdir, 'secrets')
for root, dirs, files in os.walk(temp_secrets_dir, topdown=False):
for name in files:
temp_path = os.path.join(root, name)
rel_path = os.path.relpath(temp_path, jenkinsdir)
original_path = os.path.join(secretsdir, rel_path)
if update_secrets:
logging.info('Adding new secret at {}'.format(rel_path))
shutil.copyfile(temp_path, original_path)
else:
raise ValueError('New secret at {}'.format(rel_path))
shutil.rmtree(temp_secrets_dir)
for secret_entry in secret_entries:
temp_file_path = os.path.join(jenkinsdir, secret_entry.filepath)
if os.path.isfile(temp_file_path):
element = etree.parse(temp_file_path).xpath(secret_entry.xpath)
# Check if xpath delivers multiple results. The xpath should only match once
if len(element) == 1:
current_value = element[0].text
if not current_value.strip():
raise ValueError('Element {} at {}:{} is not a text field'.
format(current_value, temp_file_path, secret_entry.xpath))
if mode == 'insert':
expected_value = secret_entry.placeholder
target_value = secret_entry.secret
elif mode == 'remove':
expected_value = secret_entry.secret
target_value = secret_entry.placeholder
else:
raise ValueError('Mode {} unknown'.format(mode))
if current_value == expected_value:
logging.debug(
'Replacing {} with {} at {}:{}'.format(current_value, target_value, secret_entry.xpath,
temp_file_path))
_replace_values(current_value, target_value, temp_file_path)
elif current_value == target_value:
logging.info('Target value {} already present. Skipping {}:{}'.
format(target_value, secret_entry.xpath, temp_file_path))
continue
else:
raise ValueError('Current value "{}" does not match expected value "{}" in {}:{}'.format(
current_value, expected_value, secret_entry.xpath))
elif len(element) == 0:
raise ValueError('Element at {}:{} not found'.format(temp_file_path, secret_entry.xpath))
else:
raise ValueError('1 Element expected, {} found at {}:{}'.
format(len(element), temp_file_path, secret_entry.xpath))
else:
raise FileNotFoundError('Could not find file {}'.format(temp_file_path))
def assemble_symlink_list(symlink_file, jenkinsdir):
"""
Assemble a list of files that should be symlinked during startup, providing support for state files on EBS
:param symlink_file: File containing path expressions to describe the symlinked files and dirs
:param jenkinsdir: Jenkins configuration directory
:return: Array of SymlinkEntry
"""
symlink_config = read_symlink_entries(symlink_file)
logging.debug('Found {} symlink entries'.format(len(symlink_config)))
symlinks = []
for symlink_entry in symlink_config:
input_path_expression = os.path.join(jenkinsdir, symlink_entry.filepath)
result_paths = []
if '*' in symlink_entry.filepath:
# If path contains a wildcard, search for all results
wildcard_path_split = input_path_expression.split('*')
if len(wildcard_path_split) == 1:
result_paths = glob.glob(input_path_expression)
elif len(wildcard_path_split) == 2:
# If wildcard is in the middle and target directories don't exist, result would be empty.
# Instead, iterate manually
for partial_path in glob.glob(os.path.join(wildcard_path_split[0], '*')):
result_paths.append(os.path.join(partial_path, wildcard_path_split[1].lstrip('/')))
else:
raise ValueError('Symlink expression may only contain one wildcard')
logging.debug('Resolving {} to {}'.format(symlink_entry.filepath, result_paths))
else:
result_paths = [input_path_expression]
for abs_path in result_paths:
rel_path = os.path.relpath(abs_path, jenkinsdir)
symlinks.append(SymlinkEntry(rel_path, symlink_entry.is_dir))
return symlinks
def read_secret_entires(varfile):
"""
Read SecretEntry from varfile
:param varfile: File containing SecretEntries as JSON
:return: Array of SecretEntry
"""
with open(varfile, 'r') as fp:
secret_dict_raw = json.load(fp)
secrets = []
for secret_entry_dict in secret_dict_raw:
# Import values in the same order as expected in SecretEntry. Reason being that the values are expected
# to be inserted in the same order as defined previously.
secrets.append(SecretEntry(*[secret_entry_dict[k] for k in SECRET_ENTRY_KEYS]))
return secrets
def read_symlink_entries(varfile):
"""
Read SymlinkEntry from varfile
:param varfile: File containing SymlinkEntries as JSON
:return: Array of SymlinkEntries
"""
with open(varfile, 'r') as fp:
symlink_dict_raw = json.load(fp)
symlinks = []
for symlink_entry_dict in symlink_dict_raw:
# Import values in the same order as expected in SecretEntry. Reason being that the values are expected
# to be inserted in the same order as defined previously.
symlinks.append(SymlinkEntry(*[symlink_entry_dict[k] for k in SYMLINK_ENTRY_KEYS]))
return symlinks
def _replace_values(current_value, target_value, temp_file_path):
# Only execute changes on the temp file
with open(temp_file_path, 'r+') as temp_fh:
temp_file_content = temp_fh.read()
# Check if current value is unique within the temp file to prevent injection attacks
nb_occurrences_temp = temp_file_content.count(current_value)
if nb_occurrences_temp != 1:
raise ValueError(
'1 occurrence of current value {} in file {} expected, {} found'.format(
current_value,
temp_file_path,
nb_occurrences_temp))
temp_file_content_replaced = temp_file_content.replace(current_value, target_value)
# Write replaced content back to the temp file
temp_fh.seek(0)
temp_fh.write(temp_file_content_replaced)
temp_fh.truncate()
if __name__ == '__main__':
main()