def get_schema_value_types()

in src/cfnlint/maintenance.py [0:0]


def get_schema_value_types():

    def resolve_refs(properties, schema):
        results = {}
        name = None

        if properties.get('$ref'):
            name = properties.get('$ref').split('/')[-1]
            subname, results = resolve_refs(schema.get('definitions').get(name), schema)
            if subname:
                name = subname
            properties = schema.get('definitions').get(name)

        if properties.get('type') == 'array':
            results = properties.get('items')

        if results:
            properties = results

        if results and results.get('$ref'):
            name, results = resolve_refs(results, schema)

        if not results:
            return name, properties

        return name, results

    def get_object_details(names, properties, schema):
        results = {}
        for propname, propdetails in properties.items():
            subname, propdetails = resolve_refs(propdetails, schema)
            t = propdetails.get('type')
            if not t:
                continue
            if t in ['object']:
                if subname is None:
                    subname = propname
                if propdetails.get('properties'):
                    if subname not in names:
                        results.update(get_object_details(
                            names + [subname], propdetails.get('properties'), schema))
                elif propdetails.get('oneOf') or propdetails.get('anyOf') or propdetails.get('allOf'):
                    LOGGER.info(
                        'Type %s object for %s has only oneOf,anyOf, or allOf properties', names[0], propname)
                    continue
            elif t not in ['string', 'integer', 'number', 'boolean']:
                if propdetails.get('$ref'):
                    results.update(get_object_details(
                        names + [propname], schema.get('definitions').get(t.get('$ref').split('/')[-1]), schema))
                elif isinstance(t, list):
                    LOGGER.info('Type for %s object and %s property is a list', names[0], propname)
                else:
                    LOGGER.info('Unable to handle %s object for %s property', names[0], propname)
            elif t == 'string':
                if not results.get('.'.join(names + [propname])):
                    if propdetails.get('pattern') or (propdetails.get('minLength') and propdetails.get('maxLength')) or propdetails.get('enum'):
                        results['.'.join(names + [propname])] = {}
                if propdetails.get('pattern'):
                    p = propdetails.get('pattern')
                    if '.'.join(names + [propname]) == 'AWS::OpsWorksCM::Server.CustomPrivateKey':
                        # one off exception to handle a weird parsing issue in python 2.7
                        continue
                    # python 3 has the ability to test isascii
                    # python 3.7 introduces is ascii so switching to encode
                    try:
                        p.encode('ascii')
                    except UnicodeEncodeError:
                        continue
                    try:
                        if '\\p{' in p:
                            continue
                        re.compile(p, re.UNICODE)
                        results['.'.join(names + [propname])].update({
                            'AllowedPatternRegex': p
                        })
                    except:  #pylint: disable=bare-except
                        LOGGER.info(
                            'Unable to handle regex for type %s and property %s with regex %s', names[0], propname, p)
                if propdetails.get('minLength') and propdetails.get('maxLength'):
                    results['.'.join(names + [propname])].update({
                        'StringMin': propdetails.get('minLength'),
                        'StringMax': propdetails.get('maxLength'),
                    })
                if propdetails.get('enum'):
                    results['.'.join(names + [propname])].update({
                        'AllowedValues': propdetails.get('enum')
                    })
            elif t in ['number', 'integer']:
                if not results.get('.'.join(names + [propname])):
                    if propdetails.get('minimum') and propdetails.get('maximum'):
                        results['.'.join(names + [propname])] = {}
                if propdetails.get('minimum') and propdetails.get('maximum'):
                    results['.'.join(names + [propname])].update({
                        'NumberMin': propdetails.get('minimum'),
                        'NumberMax': propdetails.get('maximum'),
                    })

        return results

    def process_schema(schema):
        details = get_object_details([schema.get('typeName')], schema.get('properties'), schema)

        # Remove duplicates
        vtypes = {}
        for n, v in details.items():
            if n.count('.') > 1:
                s = n.split('.')
                vtypes[s[0] + '.' + '.'.join(s[-2:])] = v
            else:
                vtypes[n] = v

        patches = []
        for n, v in vtypes.items():
            patch = []
            if v:
                if n.count('.') == 2:
                    r_type = 'PropertyTypes'
                else:
                    r_type = 'ResourceTypes'
                element = {
                    'op': 'add',
                    'path': '/%s/%s/Properties/%s/Value' % (r_type, '.'.join(n.split('.')[0:-1]), n.split('.')[-1]),
                    'value': {
                        'ValueType': n,
                    },
                }
                patch.append(element)
                for s, vs in v.items():
                    element = {
                        'op': 'add',
                        'path': '/ValueTypes/%s/%s' % (n, s),
                        'value': vs,
                    }
                    patch.append(element)
            if patch:
                patches.append(patch)

        return patches

    req = Request(REGISTRY_SCHEMA_ZIP)
    res = urlopen(req)

    results = []

    with zipfile.ZipFile(BytesIO(res.read())) as z:
        for f in z.namelist():
            with z.open(f) as d:
                if not isinstance(d, six.string_types):
                    data = d.read()
                else:
                    data = d
                if isinstance(data, bytes):
                    data = data.decode('utf-8')
                schema = json.loads(data)
                patches = process_schema(schema)
                results.extend(patches)

    return results