marketplace/deployer_util/expand_config.py (188 lines of code) (raw):
#!/usr/bin/env python3
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
from argparse import ArgumentParser
import yaml
import config_helper
import property_generator
import schema_values_common
_PROG_HELP = """
Modifies the configuration parameter files in a directory
according to their schema.
"""
_IMAGE_REPO_PREFIX_PROPERTY_NAME = '__image_repo_prefix__'
class InvalidProperty(Exception):
pass
class MissingRequiredProperty(Exception):
pass
class MissingRequiredValue(Exception):
pass
def main():
parser = ArgumentParser(description=_PROG_HELP)
schema_values_common.add_to_argument_parser(parser)
parser.add_argument(
'--final_values_file',
help='Where the final value file should be written to',
default='/data/final_values.yaml')
parser.add_argument(
'--app_uid',
help='The application UID for populating into APPLICATION_UID properties',
default='')
args = parser.parse_args()
schema = schema_values_common.load_schema(args)
values = schema_values_common.load_values(args)
values = expand(values, schema, app_uid=args.app_uid)
write_values(values, args.final_values_file)
def expand(values_dict, schema, app_uid=''):
"""Returns the expanded values according to schema."""
schema.validate()
valid_property_names = set(schema.properties.keys())
valid_property_names.add(_IMAGE_REPO_PREFIX_PROPERTY_NAME)
for k in values_dict:
if k not in valid_property_names:
raise InvalidProperty('No such property defined in schema: {}'.format(k))
# Captures the final property name-value mappings.
# This has both properties directly specified under schema's `properties` and
# generated properties. See below for details about generated properties.
result = {}
# Captures only the generated properties. These are not directly specified in
# the schema under `properties`. Rather, their name are specified in special
# `generatedProperties` fields under each property's `x-google-marketplace`.
# Note that properties with generated values are NOT generated properties.
generated = {}
if schema.is_v2():
# Handles the images section of the schema.
generate_v2_image_properties(schema, values_dict, generated)
# Copy explicitly specified values and generate values into result.
for k, prop in schema.properties.items():
v = values_dict.get(k, None)
# The value is not explicitly specified and
# thus is eligible for auto-generation.
if v is None:
if prop.password:
v = property_generator.generate_password(prop.password)
elif prop.application_uid:
v = app_uid or ''
elif prop.tls_certificate:
v = property_generator.generate_tls_certificate()
elif prop.xtype == config_helper.XTYPE_ISTIO_ENABLED:
# For backward compatibility.
v = False
elif prop.xtype == config_helper.XTYPE_INGRESS_AVAILABLE:
# For backward compatibility.
v = True
elif prop.xtype == config_helper.XTYPE_DEPLOYER_IMAGE:
v = maybe_derive_deployer_image(schema, values_dict)
elif prop.default is not None:
v = prop.default
# Generate additional properties from this property.
if v is not None:
if prop.image:
if not isinstance(v, str):
raise InvalidProperty(
'Invalid value for IMAGE property {}: {}'.format(k, v))
generate_v1_properties_for_image(prop, v, generated)
elif prop.string:
if not isinstance(v, str):
raise InvalidProperty(
'Invalid value for STRING property {}: {}'.format(k, v))
generate_properties_for_string(prop, v, generated)
elif prop.tls_certificate:
if not isinstance(v, str):
raise InvalidProperty(
'Invalid value for TLS_CERTIFICATE property {}: {}'.format(k, v))
generate_properties_for_tls_certificate(prop, v, generated)
elif prop.application_uid:
generate_properties_for_appuid(prop, v, generated)
if v is not None:
result[k] = v
validate_value_types(result, schema)
validate_required_props(result, schema)
# Copy generated properties into result, validating no collisions.
for k, v in generated.items():
if k in result:
raise InvalidProperty(
'The property is to be generated, but already has a value: {}'.format(
k))
result[k] = v
return result
def validate_required_props(values, schema):
for k in schema.required:
if k not in values:
raise MissingRequiredProperty(
'No value for required property: {}'.format(k))
def validate_value_types(values, schema):
for k, v in values.items():
prop = schema.properties[k]
if not isinstance(v, prop.type):
raise InvalidProperty(
'Property {} is expected to be of type {}, but has value: {}'.format(
k, prop.type, v))
def maybe_derive_deployer_image(schema, values_dict):
if not schema.is_v2():
return None
repo_prefix = values_dict.get(_IMAGE_REPO_PREFIX_PROPERTY_NAME, None)
if not repo_prefix:
raise MissingRequiredValue('A valid value for __image_repo_prefix__ '
'must be specified in values.yaml')
tag = schema.x_google_marketplace.published_version
return '{}/{}:{}'.format(repo_prefix, 'deployer', tag)
def generate_properties_for_appuid(prop, value, result):
if prop.application_uid.application_create:
result[prop.application_uid.application_create] = False if value else True
def generate_v1_properties_for_image(prop, value, result):
if prop.image.split_by_colon:
before_name, after_name = prop.image.split_by_colon
parts = value.split(':', 1)
if len(parts) != 2:
raise InvalidProperty(
'Property {} has value that does not contain a colon: {}'.format(
prop.name, value))
before_value, after_value = parts
result[before_name] = before_value
result[after_name] = after_value
if prop.image.split_to_registry_repo_tag:
reg_name, repo_name, tag_name = prop.image.split_to_registry_repo_tag
parts = value.split(':', 1)
if len(parts) != 2:
raise InvalidProperty(
'Property {} has value that does not contain a tag: {}'.format(
prop.name, value))
nontag_value, tag_value = parts
parts = nontag_value.split('/', 1)
if len(parts) != 2:
raise InvalidProperty(
'Property {} has value that does not include a registry: {}'.format(
prop.name, value))
reg_value, repo_value = parts
result[reg_name] = reg_value
result[repo_name] = repo_value
result[tag_name] = tag_value
def generate_v2_image_properties(schema, values_dict, result):
repo_prefix = values_dict.get(_IMAGE_REPO_PREFIX_PROPERTY_NAME, None)
if not repo_prefix:
raise MissingRequiredValue('A valid value for __image_repo_prefix__ '
'must be specified in values.yaml')
tag = schema.x_google_marketplace.published_version
for img in schema.x_google_marketplace.images.values():
if img.name:
# Allows an empty image name for legacy reason.
registry_repo = '{}/{}'.format(repo_prefix, img.name)
else:
registry_repo = repo_prefix
registry, repo = registry_repo.split('/', 1)
full = '{}:{}'.format(registry_repo, tag)
for prop in img.properties.values():
if prop.part_type == config_helper.IMAGE_PROJECTION_TYPE_FULL:
result[prop.name] = full
elif prop.part_type == config_helper.IMAGE_PROJECTION_TYPE_REGISTRY:
result[prop.name] = registry
elif prop.part_type == config_helper.IMAGE_PROJECTION_TYPE_REGISTRY_REPO:
result[prop.name] = registry_repo
elif prop.part_type == config_helper.IMAGE_PROJECTION_TYPE_REPO:
result[prop.name] = repo
elif prop.part_type == config_helper.IMAGE_PROJECTION_TYPE_TAG:
result[prop.name] = tag
else:
raise InvalidProperty(
'Invalid type for images.properties.type: {}'.format(
prop.part_type))
def generate_properties_for_string(prop, value, result):
if prop.string.base64_encoded:
result[prop.string.base64_encoded] = base64.b64encode(value.encode('ascii'))
def generate_properties_for_tls_certificate(prop, value, result):
certificate = json.loads(value)
if prop.tls_certificate.base64_encoded_private_key:
result[prop.tls_certificate.base64_encoded_private_key] = base64.b64encode(
certificate['private_key'].encode('ascii')).decode('ascii')
if prop.tls_certificate.base64_encoded_certificate:
result[prop.tls_certificate.base64_encoded_certificate] = base64.b64encode(
certificate['certificate'].encode('ascii')).decode('ascii')
def write_values(values, values_file):
if not os.path.exists(os.path.dirname(values_file)):
os.makedirs(os.path.dirname(values_file))
with open(values_file, 'w', encoding='utf-8') as f:
data = yaml.safe_dump(values, default_flow_style=False, indent=2)
f.write(data)
if __name__ == "__main__":
main()