marketplace/deployer_util/config_helper.py (838 lines of code) (raw):
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import io
import os
import re
import sys
import yaml
NAME_RE = re.compile(r'[a-zA-z0-9_\.\-]+$')
# Suggested from https://semver.org
SEMVER_RE = re.compile(r'^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)'
'(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)'
'(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?'
'(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$')
XGOOGLE = 'x-google-marketplace'
XTYPE_NAME = 'NAME'
XTYPE_NAMESPACE = 'NAMESPACE'
XTYPE_IMAGE = 'IMAGE'
XTYPE_DEPLOYER_IMAGE = 'DEPLOYER_IMAGE'
XTYPE_PASSWORD = 'GENERATED_PASSWORD'
XTYPE_REPORTING_SECRET = 'REPORTING_SECRET'
XTYPE_SERVICE_ACCOUNT = 'SERVICE_ACCOUNT'
XTYPE_STORAGE_CLASS = 'STORAGE_CLASS'
XTYPE_STRING = 'STRING'
XTYPE_APPLICATION_UID = 'APPLICATION_UID'
XTYPE_ISTIO_ENABLED = 'ISTIO_ENABLED'
XTYPE_INGRESS_AVAILABLE = 'INGRESS_AVAILABLE'
XTYPE_TLS_CERTIFICATE = 'TLS_CERTIFICATE'
XTYPE_MASKED_FIELD = 'MASKED_FIELD'
WIDGET_TYPES = ['help']
_OAUTH_SCOPE_PREFIX = 'https://www.googleapis.com/auth/'
class InvalidName(Exception):
pass
class InvalidValue(Exception):
pass
class InvalidSchema(Exception):
pass
def load_values(values_file, values_dir, schema):
if values_file == '-':
return yaml.safe_load(sys.stdin.read())
if values_file and os.path.isfile(values_file):
with open(values_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f.read())
return _read_values_to_dict(values_dir, schema)
def _read_values_to_dict(values_dir, schema):
"""Returns a dict constructed from files in values_dir."""
files = [
f for f in os.listdir(values_dir)
if os.path.isfile(os.path.join(values_dir, f))
]
result = {}
for filename in files:
if not NAME_RE.match(filename):
raise InvalidName('Invalid config parameter name: {}'.format(filename))
file_path = os.path.join(values_dir, filename)
with open(file_path, "r", encoding='utf-8') as f:
data = f.read()
result[filename] = data
# Data read in as strings. Convert them to proper types defined in schema.
result = {
k: schema.properties[k].str_to_type(v) if k in schema.properties else v
for k, v in result.items()
}
return result
class Schema:
"""Accesses a JSON schema."""
@staticmethod
def load_yaml_file(filepath):
with io.open(filepath, 'r') as f:
d = yaml.safe_load(f)
return Schema(d)
@staticmethod
def load_yaml(yaml_str):
return Schema(yaml.safe_load(yaml_str))
def __init__(self, dictionary):
self._x_google_marketplace = _maybe_get_and_apply(
dictionary, 'x-google-marketplace',
lambda v: SchemaXGoogleMarketplace(v))
self._required = dictionary.get('required', [])
self._properties = {
k: SchemaProperty(k, v, k in self._required)
for k, v in dictionary.get('properties', {}).items()
}
self._app_api_version = dictionary.get(
'applicationApiVersion', dictionary.get('application_api_version',
None))
self._form = dictionary.get('form', [])
def validate(self):
"""
Fully validates the schema, raising InvalidSchema if fails.
Intended for backward-incompatible validations that should only be
enforced upon base deployer update, as opposed to validations added
in class construction which are enforced immediately upon tools repo
release.
"""
bad_required_names = [
x for x in self._required if x not in self._properties
]
if bad_required_names:
raise InvalidSchema(
'Undefined property names found in required: {}'.format(
', '.join(bad_required_names)))
is_v2 = False
if self._x_google_marketplace is not None:
self._x_google_marketplace.validate()
is_v2 = self._x_google_marketplace.is_v2()
if not is_v2 and self._app_api_version is None:
raise InvalidSchema('applicationApiVersion is required')
if len(self.form) > 1:
raise InvalidSchema('form must not contain more than 1 item.')
for item in self.form:
if 'widget' not in item:
raise InvalidSchema('form items must have a widget.')
if item['widget'] not in WIDGET_TYPES:
raise InvalidSchema('Unrecognized form widget: {}'.format(
item['widget']))
if 'description' not in item:
raise InvalidSchema('form items must have a description.')
if is_v2:
for _, p in self._properties.items():
if p.xtype == XTYPE_IMAGE:
raise InvalidSchema(
'No properties should have x-google-marketplace.type=IMAGE in '
'schema v2. Images must be declared in the top level '
'x-google-marketplace.images')
if self._x_google_marketplace._deployer_service_account:
self._x_google_marketplace._deployer_service_account.validate()
for _, p in self._properties.items():
if p.xtype == XTYPE_SERVICE_ACCOUNT:
p.service_account.validate()
@property
def x_google_marketplace(self):
return self._x_google_marketplace
@property
def app_api_version(self):
if self.is_v2():
return self.x_google_marketplace.app_api_version
return self._app_api_version
@property
def properties(self):
return self._properties
@property
def required(self):
return self._required
@property
def form(self):
return self._form
def properties_matching(self, definition):
return [
v for k, v in self._properties.items()
if v.matches_definition(definition)
]
def is_v2(self):
if self.x_google_marketplace:
return self.x_google_marketplace.is_v2()
return False
_SCHEMA_VERSION_1 = 'v1'
_SCHEMA_VERSION_2 = 'v2'
_SCHEMA_VERSIONS = [_SCHEMA_VERSION_1, _SCHEMA_VERSION_2]
class SchemaXGoogleMarketplace:
"""Accesses the top level x-google-markplace."""
def __init__(self, dictionary):
self._app_api_version = None
self._published_version = None
self._published_version_meta = None
self._partner_id = None
self._solution_id = None
self._images = None
self._cluster_constraints = None
self._deployer_service_account = None
self._schema_version = dictionary.get('schemaVersion', _SCHEMA_VERSION_1)
if self._schema_version not in _SCHEMA_VERSIONS:
raise InvalidSchema('Invalid schema version {}'.format(
self._schema_version))
self._partner_id = dictionary.get('partnerId', None)
self._solution_id = dictionary.get('solutionId', None)
if self._partner_id or self._solution_id:
if not self._partner_id or not self._solution_id:
raise InvalidSchema(
'x-google-marketplace.partnerId and x-google-marketplace.solutionId'
' must be specified or missing together')
if 'clusterConstraints' in dictionary:
self._cluster_constraints = SchemaClusterConstraints(
dictionary['clusterConstraints'])
if not self.is_v2():
return
self._app_api_version = _must_get(
dictionary, 'applicationApiVersion',
'x-google-marketplace.applicationApiVersion is required')
self._published_version = _must_get(
dictionary, 'publishedVersion',
'x-google-marketplace.publishedVersion is required')
if not SEMVER_RE.match(self._published_version):
raise InvalidSchema(
'Invalid schema publishedVersion "{}"; must be semver including patch version'
.format(self._published_version))
self._published_version_meta = _must_get_and_apply(
dictionary, 'publishedVersionMetadata', lambda v: SchemaVersionMeta(v),
'x-google-marketplace.publishedVersionMetadata is required')
self._managed_updates = SchemaManagedUpdates(
dictionary.get('managedUpdates', {}))
images = _must_get(dictionary, 'images',
'x-google-marketplace.images is required')
self._images = {k: SchemaImage(k, v) for k, v in images.items()}
if 'deployerServiceAccount' in dictionary:
self._deployer_service_account = SchemaXServiceAccount(
dictionary['deployerServiceAccount'])
def validate(self):
pass
@property
def cluster_constraints(self):
return self._cluster_constraints
@property
def app_api_version(self):
return self._app_api_version
@property
def published_version(self):
return self._published_version
@property
def published_version_meta(self):
return self._published_version_meta
@property
def partner_id(self):
return self._partner_id
@property
def solution_id(self):
return self._solution_id
@property
def images(self):
return self._images
@property
def managed_updates(self):
return self._managed_updates
@property
def deployer_service_account(self):
return self._deployer_service_account
def is_v2(self):
return self._schema_version == _SCHEMA_VERSION_2
class SchemaManagedUpdates:
"""Accesses managedUpdates."""
def __init__(self, dictionary):
self._kalm_supported = dictionary.get('kalmSupported', False)
@property
def kalm_supported(self):
return self._kalm_supported
class SchemaClusterConstraints:
"""Accesses top level clusterConstraints."""
def __init__(self, dictionary):
self._k8s_version = dictionary.get('k8sVersion', None)
self._resources = None
self._istio = None
self._gcp = None
self._assisted_cluster_creation = None
if 'resources' in dictionary:
resources = dictionary['resources']
if not isinstance(resources, list):
raise InvalidSchema('clusterConstraints.resources must be a list')
self._resources = [SchemaResourceConstraints(r) for r in resources]
if len(list(filter(lambda x: x.requests.gpu, self._resources))) > 1:
raise InvalidSchema('At most one request may include GPUs')
self._istio = _maybe_get_and_apply(dictionary, 'istio',
lambda v: SchemaIstio(v))
self._gcp = _maybe_get_and_apply(dictionary, 'gcp', lambda v: SchemaGcp(v))
self._assisted_cluster_creation = _maybe_get_and_apply(
dictionary, 'assistedClusterCreation',
lambda v: SchemaAssistedClusterCreation(v))
@property
def k8s_version(self):
return self._k8s_version
@property
def resources(self):
return self._resources
@property
def istio(self):
return self._istio
@property
def gcp(self):
return self._gcp
@property
def assistedClusterCreation(self):
return self._assisted_cluster_creation
class SchemaResourceConstraints:
"""Accesses a single resource's constraints."""
def __init__(self, dictionary):
# TODO(#483): Require replicas for non-GPU constraints
self._replicas = dictionary.get('replicas', None)
self._affinity = _maybe_get_and_apply(
dictionary, 'affinity', lambda v: SchemaResourceConstraintAffinity(v))
self._requests = _must_get_and_apply(
dictionary, 'requests', lambda v: SchemaResourceConstraintRequests(v),
'Each item in clusterConstraints.resources must specify requests')
if self._requests.gpu:
if self._affinity:
raise InvalidSchema('Affinity unsupported for GPU resource constraints')
if self._replicas:
raise InvalidSchema('Replicas unsupported for GPU resource constraints')
@property
def replicas(self):
return self._replicas
@property
def affinity(self):
return self._affinity
@property
def requests(self):
return self._requests
class SchemaResourceConstraintAffinity:
"""Accesses a single resource's affinity constraints"""
def __init__(self, dictionary):
self._simple_node_affinity = _maybe_get_and_apply(
dictionary, 'simpleNodeAffinity', lambda v: SchemaSimpleNodeAffinity(v))
@property
def simple_node_affinity(self):
return self._simple_node_affinity
class SchemaSimpleNodeAffinity:
"""Accesses simple node affinity for resource constraints."""
def __init__(self, dictionary):
self._minimum_node_count = dictionary.get('minimumNodeCount', None)
self._type = _must_get(dictionary, 'type',
'simpleNodeAffinity requires a type')
if (self._type == 'REQUIRE_MINIMUM_NODE_COUNT' and
self._minimum_node_count is None):
raise InvalidSchema(
'simpleNodeAffinity of type REQUIRE_MINIMUM_NODE_COUNT '
'requires minimumNodeCount')
@property
def affinity_type(self):
return self._type
@property
def minimum_node_count(self):
return self._minimum_node_count
_GPU_PROVIDER_KEYS = ['nvidia.com/gpu']
class SchemaResourceConstraintRequests:
"""Accesses a single resource's requests."""
def __init__(self, dictionary):
self._cpu = dictionary.get('cpu', None)
self._memory = dictionary.get('memory', None)
self._gpu = None
rawGpu = dictionary.get('gpu', None)
if rawGpu != None:
if not isinstance(rawGpu, dict):
raise InvalidSchema(
'requests.gpu in clusterConstraints.resources must be a map')
if not rawGpu.keys():
raise InvalidSchema('GPU requests map must contain one or more entries')
if self._cpu or self._memory:
raise InvalidSchema(
'constraints with GPU requests must not specify cpu or memory')
for key in rawGpu.keys():
if key not in _GPU_PROVIDER_KEYS:
raise InvalidSchema('Unsupported GPU provider %s', key)
self._gpu = {
key: SchemaGpuResourceRequest(value)
for (key, value) in rawGpu.items()
}
if not self._cpu and not self._memory and not self._gpu:
raise InvalidSchema(
'Requests in clusterConstraints.resources must specify '
'at least one of cpu, memory, or gpu')
@property
def cpu(self):
return self._cpu
@property
def memory(self):
return self._memory
@property
def gpu(self):
return self._gpu
class SchemaGpuResourceRequest:
"""Accesses a single GPU request."""
def __init__(self, dictionary):
self._limits = dictionary.get('limits', None)
self._platforms = dictionary.get('platforms', None)
@property
def limits(self):
return self._limits
@property
def platforms(self):
return self._platforms
_ISTIO_TYPE_OPTIONAL = "OPTIONAL"
_ISTIO_TYPE_REQUIRED = "REQUIRED"
_ISTIO_TYPE_UNSUPPORTED = "UNSUPPORTED"
_ISTIO_TYPES = [
_ISTIO_TYPE_OPTIONAL, _ISTIO_TYPE_REQUIRED, _ISTIO_TYPE_UNSUPPORTED
]
class SchemaIstio:
"""Accesses top level istio."""
def __init__(self, dictionary):
self._type = dictionary.get('type', None)
_must_contain(self._type, _ISTIO_TYPES, "Invalid type of istio constraint")
@property
def type(self):
return self._type
class SchemaGcp:
"""Accesses top level GCP constraints."""
def __init__(self, dictionary):
self._nodes = _maybe_get_and_apply(dictionary, 'nodes',
lambda v: SchemaNodes(v))
@property
def nodes(self):
return self._nodes
class SchemaAssistedClusterCreation:
"""Accesses top level AssistedClusterCreation constraints."""
_ASSISTED_CC_TYPE_DISABLED = "DISABLED"
_ASSISTED_CC_TYPE_STRICT = "STRICT"
_ASSISTED_CC_TYPES = [_ASSISTED_CC_TYPE_DISABLED, _ASSISTED_CC_TYPE_STRICT]
def __init__(self, dictionary):
self._type = None
self._creation_guidance = None
self._gke = None
self._type = dictionary.get('type', None)
_must_contain(self._type, self._ASSISTED_CC_TYPES,
"Invalid type of AssistedClusterCreation")
self._creation_guidance = dictionary.get('creationGuidance')
self._gke = _maybe_get_and_apply(dictionary, 'gke', lambda v: SchemaGke(v))
if self._type == self._ASSISTED_CC_TYPE_DISABLED and not self._creation_guidance:
raise InvalidSchema(
'assistedClusterCreation.creationGuidance must be specified when '
'assistedClusterCreation.type is DISABLED')
if self._type == self._ASSISTED_CC_TYPE_STRICT and not self._gke:
raise InvalidSchema('assistedClusterCreation.gke must be specified when '
'assistedClusterCreation.type is STRICT')
@property
def type(self):
return self._type
@property
def creation_guidance(self):
return self._creation_guidance
@property
def gke(self):
return self._gke
class SchemaGke:
def __init__(self, dictionary):
self._node_pool = None
node_pool = dictionary['nodePool']
if not isinstance(node_pool, list):
raise InvalidSchema('gke.nodePool must be a list')
self._node_pool = [SchemaNodePoolDetails(r) for r in node_pool]
if len(self._node_pool) != 1:
raise InvalidSchema('gke.nodePool supports exactly one nodePool')
@property
def node_pool(self):
return self._node_pool
class SchemaNodePoolDetails:
def __init__(self, dictionary):
self._num_nodes = _must_get(dictionary, 'numNodes',
'NodePoolDetails must have numNodes property')
self._machine_type = _must_get(
dictionary, 'machineType',
'NodePoolDetails must have machineType property')
if "custom-" in self._machine_type:
splits = re.split("-", self._machine_type)
if len(splits) < 3:
raise InvalidSchema(
'Custom machine types should be specified using following convention: '
'custom-[NUMBER_OF_CPUS]-[NUMBER_OF_MB]')
cores = int(splits[-2])
if cores != 1 and cores % 2 != 0:
raise InvalidSchema(
'Number of cores for machineType could either be 1 or an even number'
)
@property
def num_nodes(self):
return self._num_nodes
@property
def machine_type(self):
return self._machine_type
class SchemaNodes:
"""Accesses GKE cluster node constraints."""
def __init__(self, dictionary):
self._required_oauth_scopes = dictionary.get('requiredOauthScopes', [])
if not isinstance(self._required_oauth_scopes, list):
raise InvalidSchema('nodes.requiredOauthScopes must be a list')
for scope in self._required_oauth_scopes:
if not scope.startswith(_OAUTH_SCOPE_PREFIX):
raise InvalidSchema(
'OAuth scope references must be fully-qualified (start with {})'
.format(_OAUTH_SCOPE_PREFIX))
@property
def required_oauth_scopes(self):
return self._required_oauth_scopes
class SchemaImage:
"""Accesses an image definition."""
def __init__(self, name, dictionary):
self._name = name
self._properties = {
k: SchemaImageProjectionProperty(k, v)
for k, v in dictionary.get('properties', {}).items()
}
@property
def name(self):
return self._name
@property
def properties(self):
return self._properties
IMAGE_PROJECTION_TYPE_FULL = 'FULL'
IMAGE_PROJECTION_TYPE_REPO = 'REPO_WITHOUT_REGISTRY'
IMAGE_PROJECTION_TYPE_REGISTRY_REPO = 'REPO_WITH_REGISTRY'
IMAGE_PROJECTION_TYPE_REGISTRY = 'REGISTRY'
IMAGE_PROJECTION_TYPE_TAG = 'TAG'
_IMAGE_PROJECTION_TYPES = [
IMAGE_PROJECTION_TYPE_FULL,
IMAGE_PROJECTION_TYPE_REPO,
IMAGE_PROJECTION_TYPE_REGISTRY_REPO,
IMAGE_PROJECTION_TYPE_REGISTRY,
IMAGE_PROJECTION_TYPE_TAG,
]
class SchemaImageProjectionProperty:
"""Accesses a property that an image name projects to."""
def __init__(self, name, dictionary):
self._name = name
self._type = _must_get(
dictionary, 'type',
'Each property for an image in x-google-marketplace.images '
'must have a valid type')
if self._type not in _IMAGE_PROJECTION_TYPES:
raise InvalidSchema('image property {} has invalid type {}'.format(
name, self._type))
@property
def name(self):
return self._name
@property
def part_type(self):
return self._type
class SchemaVersionMeta:
"""Accesses publishedVersionMetadata."""
def __init__(self, dictionary):
self._recommended = dictionary.get('recommended', False)
self._release_types = dictionary.get('releaseTypes', [])
self._release_note = _must_get(
dictionary, 'releaseNote',
'publishedVersionMetadata.releaseNote is required')
@property
def recommended(self):
return self._recommended
@property
def release_note(self):
return self._release_note
@property
def release_types(self):
return self._release_types
class SchemaProperty:
"""Accesses a JSON schema property."""
def __init__(self, name, dictionary, required):
self._name = name
self._d = dictionary
self._required = required
self._default = dictionary.get('default', None)
self._x = dictionary.get(XGOOGLE, None)
self._application_uid = None
self._image = None
self._password = None
self._reporting_secret = None
self._service_account = None
self._storage_class = None
self._string = None
self._tls_certificate = None
if not NAME_RE.match(name):
raise InvalidSchema('Invalid property name: {}'.format(name))
self._type = _must_get_and_apply(
dictionary, 'type', lambda v: {
'int': int,
'integer': int,
'string': str,
'number': float,
'boolean': bool,
}.get(v, None), 'Property {} has no type'.format(name))
if not self._type:
raise InvalidSchema('Property {} has unsupported type: {}'.format(
name, dictionary['type']))
if self._default:
if not isinstance(self._default, self._type):
raise InvalidSchema(
'Property {} has a default value of invalid type'.format(name))
if self._x:
xt = _must_get(self._x, 'type',
'Property {} has {} without a type'.format(name, XGOOGLE))
if xt in (XTYPE_NAME, XTYPE_NAMESPACE, XTYPE_DEPLOYER_IMAGE,
XTYPE_MASKED_FIELD):
_property_must_have_type(self, str)
elif xt in (XTYPE_ISTIO_ENABLED, XTYPE_INGRESS_AVAILABLE):
_property_must_have_type(self, bool)
elif xt == XTYPE_APPLICATION_UID:
_property_must_have_type(self, str)
d = self._x.get('applicationUid', {})
self._application_uid = SchemaXApplicationUid(d)
elif xt == XTYPE_IMAGE:
_property_must_have_type(self, str)
d = self._x.get('image', {})
self._image = SchemaXImage(d, self._default)
elif xt == XTYPE_PASSWORD:
_property_must_have_type(self, str)
d = self._x.get('generatedPassword', {})
spec = {
'length': d.get('length', 10),
'include_symbols': d.get('includeSymbols', False),
'base64': d.get('base64', True),
}
self._password = SchemaXPassword(**spec)
elif xt == XTYPE_SERVICE_ACCOUNT:
_property_must_have_type(self, str)
d = self._x.get('serviceAccount', {})
self._service_account = SchemaXServiceAccount(d)
elif xt == XTYPE_STORAGE_CLASS:
_property_must_have_type(self, str)
d = self._x.get('storageClass', {})
self._storage_class = SchemaXStorageClass(d)
elif xt == XTYPE_STRING:
_property_must_have_type(self, str)
d = self._x.get('string', {})
self._string = SchemaXString(d)
elif xt == XTYPE_REPORTING_SECRET:
_property_must_have_type(self, str)
d = self._x.get('reportingSecret', {})
self._reporting_secret = SchemaXReportingSecret(d)
elif xt == XTYPE_TLS_CERTIFICATE:
_property_must_have_type(self, str)
d = self._x.get('tlsCertificate', {})
self._tls_certificate = SchemaXTlsCertificate(d)
else:
raise InvalidSchema('Property {} has an unknown type: {}'.format(
name, xt))
@property
def name(self):
return self._name
@property
def required(self):
return self._required
@property
def default(self):
return self._default
@property
def type(self):
"""Python type of the property."""
return self._type
@property
def xtype(self):
if self._x:
return self._x['type']
return None
@property
def application_uid(self):
return self._application_uid
@property
def image(self):
return self._image
@property
def password(self):
return self._password
@property
def reporting_secret(self):
return self._reporting_secret
@property
def service_account(self):
return self._service_account
@property
def storage_class(self):
return self._storage_class
@property
def string(self):
return self._string
@property
def tls_certificate(self):
return self._tls_certificate
def str_to_type(self, str_val):
if self._type == bool:
if str_val in {'true', 'True', 'yes', 'Yes'}:
return True
elif str_val in {'false', 'False', 'no', 'No'}:
return False
else:
raise InvalidValue('Bad value for boolean property {}: {}'.format(
self._name, str_val))
return self._type(str_val)
def matches_definition(self, definition):
"""Returns true of the definition partially matches.
The definition argument is a dictionary. All fields in the hierarchy
defined there must be present and have the same values in the schema
in order for the property to be a match.
There is a special `name` field in the dictionary that captures the
property name, which does not originally exist in the schema.
"""
def _matches(dictionary, subdict):
for k, sv in subdict.items():
v = dictionary.get(k, None)
if isinstance(v, dict):
if not _matches(v, sv):
return False
else:
if v != sv:
return False
return True
return _matches(
dict(list(self._d.items()) + [('name', self._name)]), definition)
def __eq__(self, other):
if not isinstance(other, SchemaProperty):
return False
return other._name == self._name and other._d == self._d
class SchemaXApplicationUid:
"""Accesses APPLICATION_UID properties."""
def __init__(self, dictionary):
generated_properties = dictionary.get('generatedProperties', {})
self._application_create = generated_properties.get(
'createApplicationBoolean', None)
@property
def application_create(self):
return self._application_create
class SchemaXImage:
"""Accesses IMAGE and DEPLOYER_IMAGE properties."""
def __init__(self, dictionary, default):
self._split_by_colon = None
self._split_to_registry_repo_tag = None
if not default:
raise InvalidSchema('default image value must be specified')
if not default.startswith('gcr.io'):
raise InvalidSchema(
'default image value must state registry: {}'.format(default))
if ':' not in default:
raise InvalidSchema(
'default image value is missing a tag or digest: {}'.format(default))
generated_properties = dictionary.get('generatedProperties', {})
if 'splitByColon' in generated_properties:
s = generated_properties['splitByColon']
self._split_by_colon = (
_must_get(s, 'before',
'"before" attribute is required within splitByColon'),
_must_get(s, 'after',
'"after" attribute is required within splitByColon'))
if 'splitToRegistryRepoTag' in generated_properties:
s = generated_properties['splitToRegistryRepoTag']
parts = ['registry', 'repo', 'tag']
self._split_to_registry_repo_tag = tuple([
_must_get(
s, name,
'"{}" attribute is required within splitToRegistryRepoTag'.format(
name)) for name in parts
])
@property
def split_by_colon(self):
"""Return 2-tuple of before- and after-colon names, or None"""
return self._split_by_colon
@property
def split_to_registry_repo_tag(self):
"""Return 3-tuple, or None"""
return self._split_to_registry_repo_tag
SchemaXPassword = collections.namedtuple(
'SchemaXPassword', ['length', 'include_symbols', 'base64'])
class SchemaXServiceAccount:
"""Accesses SERVICE_ACCOUNT property."""
def __init__(self, dictionary):
self._description = dictionary.get('description', None)
self._roles = dictionary.get('roles', [])
for role in self._roles:
if role.get('rulesType') == 'PREDEFINED':
if role.get('rules'):
raise InvalidSchema('rules can only be used with rulesType CUSTOM')
if not role.get('rulesFromRoleName'):
raise InvalidSchema('Missing rulesFromRoleName for PREDEFINED role')
elif role.get('rulesType') == 'CUSTOM':
if role.get('rulesFromRoleName'):
raise InvalidSchema(
'rulesFromRoleName can only be used with rulesType PREDEFINED')
if not role.get('rules'):
raise InvalidSchema('Missing rules for CUSTOM role')
for rule in role.get('rules', []):
if rule.get('nonResourceURLs'):
raise InvalidSchema(
'Only attributes for resourceRules are supported in rules')
if not rule.get('apiGroups'):
raise InvalidSchema("Missing apiGroups in rules. "
"Did you mean [\"\"] (only core APIs)"
"or [\"*\"] (all)?")
if not [x for x in rule.get('resources', []) if x]:
raise InvalidSchema('Missing or empty resources in rules.')
if not [x for x in rule.get('verbs', []) if x]:
raise InvalidSchema('Missing or empty verbs in rules.')
else:
raise InvalidSchema('rulesType must be one of PREDEFINED or CUSTOM')
def custom_role_rules(self):
"""Returns a list of rules for custom Roles."""
return [
role.get('rules', [])
for role in self._roles
if role['type'] == 'Role' and role['rulesType'] == 'CUSTOM'
]
def custom_cluster_role_rules(self):
"""Returns a list of rules for custom ClusterRoles."""
return [
role.get('rules', [])
for role in self._roles
if role['type'] == 'ClusterRole' and role['rulesType'] == 'CUSTOM'
]
def predefined_roles(self):
"""Returns a list of predefined Roles."""
return [
role.get('rulesFromRoleName')
for role in self._roles
if role['type'] == 'Role' and role['rulesType'] == 'PREDEFINED'
]
def predefined_cluster_roles(self):
"""Returns a list of predefined ClusterRoles."""
return [
role.get('rulesFromRoleName')
for role in self._roles
if role['type'] == 'ClusterRole' and role['rulesType'] == 'PREDEFINED'
]
def validate(self):
"""Called by Schema.validate(); for backwards-incompatible checks."""
if not self._description:
raise InvalidSchema(
'SERVICE_ACCOUNT must have a `description` '
'explaining purpose and permission requirements. See docs: '
'https://github.com/GoogleCloudPlatform/marketplace-k8s-app-tools/blob/master/docs/schema.md#type-service_account'
)
if self.has_discouraged_cluster_scoped_permissions():
raise InvalidSchema(
'Disallowed service account role(s): '
'For `ClusterRole` roles, only the "view" predefined role is '
'allowed. Instead, use a "CUSTOM" role with specific '
'"apiGroups" and/or "resources".')
def has_discouraged_cluster_scoped_permissions(self):
"""Returns true if the service account has discouraged permissions."""
# Consider all predefined roles except `view`.
if len(
list(
filter(lambda roleName: not roleName == 'view',
self.predefined_cluster_roles()))) > 0:
return True
# Consider apiGroups=['*'] + resources=['*'] + verbs=[<write>],
# which is essentially `cluster-admin`.
# Allow if verbs are explicitly declared for applications which
# truly need those permissions.
for rules in self.custom_cluster_role_rules():
for rule in rules:
if '*' in rule.get('apiGroups') and '*' in rule.get(
'resources') and '*' in rule.get('verbs'):
return True
return False
class SchemaXStorageClass:
"""Accesses STORAGE_CLASS property."""
def __init__(self, dictionary):
self._type = dictionary['type']
@property
def ssd(self):
return self._type == 'SSD'
class SchemaXString:
"""Accesses STRING property."""
def __init__(self, dictionary):
generated_properties = dictionary.get('generatedProperties', {})
self._base64_encoded = generated_properties.get('base64Encoded', None)
@property
def base64_encoded(self):
return self._base64_encoded
class SchemaXReportingSecret:
"""Accesses REPORTING_SECRET property."""
def __init__(self, dictionary):
pass
class SchemaXTlsCertificate:
"""Accesses TLS_CERTIFICATE property."""
def __init__(self, dictionary):
generated_properties = dictionary.get('generatedProperties', {})
self._base64_encoded_private_key = generated_properties.get(
'base64EncodedPrivateKey', None)
self._base64_encoded_certificate = generated_properties.get(
'base64EncodedCertificate', None)
@property
def base64_encoded_private_key(self):
return self._base64_encoded_private_key
@property
def base64_encoded_certificate(self):
return self._base64_encoded_certificate
def _must_get(dictionary, key, error_msg):
"""Gets the value of the key, or raises InvalidSchema."""
if key not in dictionary:
raise InvalidSchema(error_msg)
return dictionary[key]
def _maybe_get_and_apply(dictionary, key, apply_fn):
"""Returns the result of apply_fn on the value of the key if not None."""
if key not in dictionary:
return None
return apply_fn(dictionary[key])
def _must_get_and_apply(dictionary, key, apply_fn, error_msg):
"""Similar to _maybe_get_and_apply but raises InvalidSchema if no such key."""
value = _must_get(dictionary, key, error_msg)
return apply_fn(value)
def _must_contain(value, valid_list, error_msg):
"""Validates that value in valid_list, or raises InvalidSchema."""
if value not in valid_list:
raise InvalidSchema("{}. Must be one of {}".format(error_msg,
', '.join(valid_list)))
def _property_must_have_type(prop, expected_type):
if prop.type != expected_type:
readable_type = {
str: 'string',
bool: 'boolean',
int: 'integer',
float: 'float',
}.get(expected_type, expected_type.__name__)
raise InvalidSchema(
'{} x-google-marketplace type property must be of type {}'.format(
prop.xtype, readable_type))