in src/cfnlint/maintenance.py [0:0]
def get_schema_value_types():
def resolve_refs(properties, schema):
results = {}
name = None
if properties.get('$ref'):
name = properties.get('$ref').split('/')[-1]
subname, results = resolve_refs(schema.get('definitions').get(name), schema)
if subname:
name = subname
properties = schema.get('definitions').get(name)
if properties.get('type') == 'array':
results = properties.get('items')
if results:
properties = results
if results and results.get('$ref'):
name, results = resolve_refs(results, schema)
if not results:
return name, properties
return name, results
def get_object_details(names, properties, schema):
results = {}
for propname, propdetails in properties.items():
subname, propdetails = resolve_refs(propdetails, schema)
t = propdetails.get('type')
if not t:
continue
if t in ['object']:
if subname is None:
subname = propname
if propdetails.get('properties'):
if subname not in names:
results.update(get_object_details(
names + [subname], propdetails.get('properties'), schema))
elif propdetails.get('oneOf') or propdetails.get('anyOf') or propdetails.get('allOf'):
LOGGER.info(
'Type %s object for %s has only oneOf,anyOf, or allOf properties', names[0], propname)
continue
elif t not in ['string', 'integer', 'number', 'boolean']:
if propdetails.get('$ref'):
results.update(get_object_details(
names + [propname], schema.get('definitions').get(t.get('$ref').split('/')[-1]), schema))
elif isinstance(t, list):
LOGGER.info('Type for %s object and %s property is a list', names[0], propname)
else:
LOGGER.info('Unable to handle %s object for %s property', names[0], propname)
elif t == 'string':
if not results.get('.'.join(names + [propname])):
if propdetails.get('pattern') or (propdetails.get('minLength') and propdetails.get('maxLength')) or propdetails.get('enum'):
results['.'.join(names + [propname])] = {}
if propdetails.get('pattern'):
p = propdetails.get('pattern')
if '.'.join(names + [propname]) == 'AWS::OpsWorksCM::Server.CustomPrivateKey':
# one off exception to handle a weird parsing issue in python 2.7
continue
# python 3 has the ability to test isascii
# python 3.7 introduces is ascii so switching to encode
try:
p.encode('ascii')
except UnicodeEncodeError:
continue
try:
if '\\p{' in p:
continue
re.compile(p, re.UNICODE)
results['.'.join(names + [propname])].update({
'AllowedPatternRegex': p
})
except: #pylint: disable=bare-except
LOGGER.info(
'Unable to handle regex for type %s and property %s with regex %s', names[0], propname, p)
if propdetails.get('minLength') and propdetails.get('maxLength'):
results['.'.join(names + [propname])].update({
'StringMin': propdetails.get('minLength'),
'StringMax': propdetails.get('maxLength'),
})
if propdetails.get('enum'):
results['.'.join(names + [propname])].update({
'AllowedValues': propdetails.get('enum')
})
elif t in ['number', 'integer']:
if not results.get('.'.join(names + [propname])):
if propdetails.get('minimum') and propdetails.get('maximum'):
results['.'.join(names + [propname])] = {}
if propdetails.get('minimum') and propdetails.get('maximum'):
results['.'.join(names + [propname])].update({
'NumberMin': propdetails.get('minimum'),
'NumberMax': propdetails.get('maximum'),
})
return results
def process_schema(schema):
details = get_object_details([schema.get('typeName')], schema.get('properties'), schema)
# Remove duplicates
vtypes = {}
for n, v in details.items():
if n.count('.') > 1:
s = n.split('.')
vtypes[s[0] + '.' + '.'.join(s[-2:])] = v
else:
vtypes[n] = v
patches = []
for n, v in vtypes.items():
patch = []
if v:
if n.count('.') == 2:
r_type = 'PropertyTypes'
else:
r_type = 'ResourceTypes'
element = {
'op': 'add',
'path': '/%s/%s/Properties/%s/Value' % (r_type, '.'.join(n.split('.')[0:-1]), n.split('.')[-1]),
'value': {
'ValueType': n,
},
}
patch.append(element)
for s, vs in v.items():
element = {
'op': 'add',
'path': '/ValueTypes/%s/%s' % (n, s),
'value': vs,
}
patch.append(element)
if patch:
patches.append(patch)
return patches
req = Request(REGISTRY_SCHEMA_ZIP)
res = urlopen(req)
results = []
with zipfile.ZipFile(BytesIO(res.read())) as z:
for f in z.namelist():
with z.open(f) as d:
if not isinstance(d, six.string_types):
data = d.read()
else:
data = d
if isinstance(data, bytes):
data = data.decode('utf-8')
schema = json.loads(data)
patches = process_schema(schema)
results.extend(patches)
return results