tools/asset-inventory/asset_inventory/api_schema.py (381 lines of code) (raw):
#!/usr/bin/env python
#
# Copyright 2019 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generates BigQuery schema from API discovery documents."""
import re
from asset_inventory import bigquery_schema
import requests
class APISchema(object):
"""Convert a CAI asset type to a BigQuery table schema.
When import_pipeline uses a group_by of ASSET_TYPE or ASSET_TYPE_VERSION
use a BigQuery schema generated from API discovery documents. This
gives us all the type, names and description of an asset type property.
Will union all API versions into a single schema.
"""
_discovery_document_cache = {}
_schema_cache = {}
@classmethod
def _get_discovery_document(cls, dd_url):
"""Retrieve and cache a discovery document."""
if dd_url in cls._discovery_document_cache:
return cls._discovery_document_cache[dd_url]
discovery_document = None
# Ignore discovery document urls that aren't urls.
if dd_url and dd_url.startswith('http'):
response = requests.get(dd_url, timeout=3.05)
if response.status_code == 200:
try:
discovery_document = response.json()
except ValueError:
pass
cls._discovery_document_cache[dd_url] = discovery_document
return discovery_document
@classmethod
def _get_api_name_for_discovery_document_url(cls, dd_url):
"""Get API name from discovery document url.
Args:
dd_url: Discovery document url.
Returns:
API name if one can be found, None otherwise.
"""
apiary_match = re.match(r'https://[^/]+/discovery/v1/apis/([^/]+)',
dd_url)
if apiary_match:
return apiary_match.group(1)
one_match = re.match(r'https://([^.]+).googleapis.com/\$discovery/rest',
dd_url)
if one_match:
return one_match.group(1)
return None
@classmethod
def _get_discovery_document_versions(cls, doc_url):
"""Return all versions of the APIs discovery documents.
Args:
doc_url: the url of the asset's discovery document.
Returns:
list of discovery document json objects.
"""
# calculate all the discovery documents from the url.
discovery_documents = []
api_name = cls._get_api_name_for_discovery_document_url(doc_url)
doc = cls._get_discovery_document(doc_url)
# add the discovery document to return value
discovery_documents += [doc] if doc else []
# and discovery documents from other versions of the same API.
all_discovery_docs = cls._get_discovery_document(
'https://content.googleapis.com/discovery/v1/apis')
for discovery_doc in all_discovery_docs['items']:
dru = discovery_doc['discoveryRestUrl']
if api_name == discovery_doc['name'] and dru != doc_url:
doc = cls._get_discovery_document(dru)
discovery_documents += [doc] if doc else []
return discovery_documents
@classmethod
def _get_schema_for_resource(cls, discovery_documents, resource_name):
"""Translate API discovery documents to a BigQuery schema."""
schemas = [
cls._translate_resource_to_schema(resource_name, document)
for document in discovery_documents
]
merged_schema = bigquery_schema.merge_schemas(schemas)
return merged_schema
@classmethod
def _get_bigquery_type_for_property(cls, property_value, resources):
"""Map API type to a BigQuery type."""
# default type
bigquery_type = 'STRING'
property_type = property_value.get('type', None)
# nested record.
if (property_type == 'any' or
property_type == 'object' or
'properties' in property_value):
bigquery_type = 'RECORD'
# repeated, recurse into element type.
elif property_type == 'array':
return cls._get_bigquery_type_for_property(
property_value['items'],
resources)
if property_type in ('number', 'integer'):
bigquery_type = 'NUMERIC'
elif property_type == 'boolean':
bigquery_type = 'BOOL'
# type reference.
elif '$ref' in property_value:
property_resource_name = cls._ref_resource_name(property_value)
if property_resource_name:
return cls._get_bigquery_type_for_property(
resources[property_resource_name],
resources)
return bigquery_type
return bigquery_type
@classmethod
def _ref_resource_name(cls, property_value):
ref_name = property_value.get('$ref', None)
# strip the '#/definitions/' prefix if present.
if ref_name and ref_name.startswith('#/definitions/'):
return ref_name[len('#/definitions/'):]
return ref_name
@classmethod
def _get_properties_map_field_list(cls, property_name, property_value,
resources, seen_resources):
"""Return the fields of the `RECORD` property.
Args:
property_name: name of API property
property_value: value of the API property.
resources: dict of all other resources that might be referenced by
the API schema through reference types ($ref values).
seen_resources: dict of types we have processed to prevent endless
cycles.
Returns:
BigQuery fields dict list or None if the field should be skipped.
"""
return_value = []
# explicit properties are added to the list of fields to return.
if 'properties' in property_value:
return_value += cls._properties_map_to_field_list(
property_value['properties'], resources, seen_resources)
# handle $ref
property_resource_name = cls._ref_resource_name(property_value)
# get fields of the reference type.
if property_resource_name:
# if a field is recursive, ignore the field.
if property_resource_name in seen_resources:
return []
# track prior types to not recurse forever.
seen_resources[property_resource_name] = True
return_value += cls._get_properties_map_field_list(
property_resource_name, resources[property_resource_name],
resources, seen_resources)
del seen_resources[property_resource_name]
# handle 'items'
if 'items' in property_value:
return_value += cls._get_properties_map_field_list(
property_name, property_value['items'],
resources, seen_resources)
# additionalProperties is a repeated field,
# to support a nested repeated field we need another nested field.
if cls.is_additional_property_fields(return_value):
return_value = [{'name': 'additionalProperties',
'field_type': 'RECORD',
'description': 'additionalProperties',
'fields': return_value,
'mode': 'REPEATED'}]
# does the property allow arbitrary properties with no schema?
if cls.allows_additional_properties(property_value):
# assign the additional_properties to the return value.
additional_properties_fields = return_value
# if there are 'properties' in addition to 'additionalProperties',
# we'll create a new 'additionalProperties' nested field to hold the
# repeated name value property list.
if additional_properties_fields:
ap_field = {'name': 'additionalProperties',
'field_type': 'RECORD',
'description': 'additionalProperties',
'fields': [],
'mode': 'REPEATED'}
return_value.append(ap_field)
additional_properties_fields = ap_field['fields']
# add name field.
additional_properties_fields.append(
{'name': 'name',
'field_type': 'STRING',
'description': 'additionalProperties name',
'mode': 'NULLABLE'})
# is there an explicit type for the additional property value?
ap_prop = property_value.get('additionalProperties', None)
if isinstance(ap_prop, dict) and ap_prop:
ap_field = cls._property_to_field(
'value', ap_prop,
resources, seen_resources)
# only add the value property if it's valid,
# also, don't double next additional properties.
# which can happen if the property type of the additional
# properties value is arbitrary.
if (ap_field is not None
and not cls.is_additional_property_fields(
ap_field.get('fields', None))):
additional_properties_fields.append(ap_field)
# if we didn't find a value property,
# add a generic 'STRING' value property
if len(additional_properties_fields) < 2:
additional_properties_fields.append(
{'name': 'value',
'field_type': 'STRING',
'description': 'additionalProperties value',
'mode': 'NULLABLE'})
return return_value
@classmethod
def allows_additional_properties(cls, schema_object):
"""True if the schema allows arbitrary properties."""
return (('items' not in schema_object and
'$ref' not in schema_object and
'properties' not in schema_object) or
('additionalProperties' in schema_object) and
schema_object['additionalProperties'] is not False)
@classmethod
def is_additional_property_fields(cls, fields):
"""True if 'fields' is an additionalProperties schema field list."""
return fields and len(fields) == 2 and all(
(f.get('name', None) == 'name'
and f.get('description', None) == 'additionalProperties name')
or (f.get('name', None) == 'value') for f in fields)
@classmethod
def _property_to_field(cls, property_name, property_value,
resources, seen_resources):
"""Convert api property to BigQuery field.
Args:
property_name: name of API property
property_value: value of the API property.
resources: dict of all other resources that might be referenced by
the API schema through reference types ($ref values).
seen_resources: dict of types we have processed to prevent endless
Returns:
BigQuery field or None if the field should be skipped.
"""
field = {'name': property_name}
property_type = property_value.get('type', None)
bigquery_type = cls._get_bigquery_type_for_property(
property_value, resources)
field['field_type'] = bigquery_type
if 'description' in property_value:
field['description'] = property_value['description'][:1024]
fields_list = []
if bigquery_type == 'RECORD':
fields_list = cls._get_properties_map_field_list(
property_name, property_value, resources, seen_resources)
# did we find any fields?
if not fields_list:
return None
field['fields'] = fields_list
# array fields are BigQuery repeated fields, and convert
# additionalProperties to repeated lists of key value pairs.
if (property_type == 'array' or
cls.is_additional_property_fields(fields_list)):
field['mode'] = 'REPEATED'
else:
field['mode'] = 'NULLABLE'
return field
@classmethod
def _properties_map_to_field_list(cls, properties_map, resources,
seen_resources):
"""Convert API resource properties to BigQuery schema.
Args:
properties_map: dict of properties from the API schema document we
are converting into a BigQuery field list.
resources: dict of all other resources that might be referenced by
the API schema through reference types ($ref values).
seen_resources: dict of types we have processed to prevent endless
cycles.
Returns:
BigQuery fields dict list.
"""
fields = []
for property_name, property_value in properties_map.items():
field = cls._property_to_field(property_name, property_value,
resources, seen_resources)
if field is not None:
fields.append(field)
return fields
@classmethod
def _get_cache_key(cls, resource_name, document):
if 'id' in document:
return f'{document["id"]}.{resource_name}'
if 'info' in document:
info = document['info']
return f'{info["title"]}.{info["version"]}.{resource_name}'
return resource_name
@classmethod
def _get_document_resources(cls, document):
if document.get('schemas'):
return document['schemas']
return document.get('definitions', [])
@classmethod
def _translate_resource_to_schema(cls, resource_name, document):
"""Expands the $ref properties of a resource definition."""
cache_key = cls._get_cache_key(resource_name, document)
if cache_key in cls._schema_cache:
return cls._schema_cache[cache_key]
resources = cls._get_document_resources(document)
field_list = []
if resource_name in resources:
resource = resources[resource_name]
properties_map = resource['properties']
field_list = cls._properties_map_to_field_list(
properties_map, resources, {resource_name: True})
cls._schema_cache[cache_key] = field_list
return field_list
@classmethod
def _add_asset_export_fields(cls,
schema,
include_resource=True,
include_iam_policy=True):
"""Add the fields that the asset export adds to each resource.
Args:
schema: list of `google.cloud.bigquery.SchemaField` like dict
objects .
include_resource: to include resource schema.
include_iam_policy: to include iam policy schema.
Returns:
list of `google.cloud.bigquery.SchemaField` like dict objects.
"""
asset_schema = [{
'name': 'name',
'field_type': 'STRING',
'description': 'URL of the asset.',
'mode': 'REQUIRED'
}, {
'name': 'asset_type',
'field_type': 'STRING',
'description': 'Asset name.',
'mode': 'REQUIRED'
}, {
'name': 'timestamp',
'field_type': 'TIMESTAMP',
'description': 'Load time.',
'mode': 'NULLABLE'
}, {
'name': 'ancestors',
'field_type': 'STRING',
'mode': 'REPEATED',
'description': 'The ancestry path of an asset in Google Cloud.'
},
{
'name': 'update_time',
'field_type': 'STRING',
'mode': 'NULLABLE',
'description': 'The last update timestamp of an asset.'
}]
if include_resource:
resource_schema = list(schema)
_, last_modified = bigquery_schema.get_field_by_name(
resource_schema,
'lastModifiedTime')
if not last_modified:
# if we lack a lastModified time in the schema, add it, some
# resources include it without being in the schema.
resource_schema.append({
'name': 'lastModifiedTime',
'field_type': 'STRING',
'mode': 'NULLABLE',
'description': 'Last time resource was changed.'
})
asset_schema.append({
'name': 'resource',
'field_type': 'RECORD',
'description': 'Resource properties.',
'mode': 'NULLABLE',
'fields': [{
'name': 'version',
'field_type': 'STRING',
'description': 'Api version of resource.',
'mode': 'REQUIRED'
}, {
'name': 'discovery_document_uri',
'field_type': 'STRING',
'description': 'Discovery document uri.',
'mode': 'REQUIRED'
}, {
'name': 'parent',
'field_type': 'STRING',
'description': 'Parent resource.',
'mode': 'NULLABLE'
}, {
'name': 'discovery_name',
'field_type': 'STRING',
'description': 'Name in discovery document.',
'mode': 'REQUIRED'
}, {
'name': 'data',
'field_type': 'RECORD',
'description': 'Resource properties.',
'mode': 'NULLABLE',
'fields': resource_schema
},
{
'name': 'location',
'field_type': 'STRING',
'description': 'The location of the resource in Google Cloud, such as its zone and region. '
'For more information, see https://cloud.google.com/about/locations/.',
'mode': 'NULLABLE'
},
{
'name': 'json_data',
'field_type': 'JSON',
'description': 'Original JSON of the resource.',
'mode': 'NULLABLE'
}]
})
if include_iam_policy:
asset_schema.append({
'name': 'iam_policy',
'field_type': 'RECORD',
'description': 'IAM Policy',
'mode': 'NULLABLE',
'fields': [{
'name': 'etag',
'field_type': 'STRING',
'description': 'Etag.',
'mode': 'NULLABLE'
}, {
'name': 'audit_configs',
'field_type': 'RECORD',
'description': 'Logging of each type of permission.',
'mode': 'REPEATED',
'fields': [{
'name': 'service',
'field_type': 'STRING',
'description':
'Service that will be enabled for audit logging.',
'mode': 'NULLABLE'
}, {
'name': 'audit_log_configs',
'field_type': 'RECORD',
'description': 'Logging of each type of permission.',
'mode': 'REPEATED',
'fields': [{
'name': 'log_type',
'field_type': 'NUMERIC',
'mode': 'NULLABLE',
'description':
('1: Admin reads. Example: CloudIAM getIamPolicy'
'2: Data writes. Example: CloudSQL Users create'
'3: Data reads. Example: CloudSQL Users list')
}]
}]
}, {
'name': 'bindings',
'field_type': 'RECORD',
'mode': 'REPEATED',
'description': 'Bindings',
'fields': [{
'name': 'role',
'field_type': 'STRING',
'mode': 'NULLABLE',
'description': 'Assigned role.'
}, {
'name': 'members',
'field_type': 'STRING',
'mode': 'REPEATED',
'description': 'Principles assigned the role.'
}]
}]
})
return asset_schema
@classmethod
def bigquery_schema_for_resource(cls, asset_type,
resource_name,
discovery_doc_url,
include_resource,
include_iam_policy):
"""Returns the BigQuery schema for the asset type.
Args:
asset_type: CAI asset type.
resource_name: name of the resource.
discovery_doc_url: URL of discovery document
include_resource: if resource schema should be included.
include_iam_policy: if IAM policy schema should be included.
Returns:
BigQuery schema.
"""
# some resources use asset_type as their discovery_name incorrectly.
# this tries to correct that.
if resource_name is not None and '/' in resource_name:
resource_name = resource_name[resource_name.find('/') + 1:]
cache_key = f'{asset_type}.{include_resource}.{include_iam_policy}'
if cache_key in cls._schema_cache:
return cls._schema_cache[cache_key]
# get the resource schema if we are including the resource
# in the export.
resource_schema = None
if include_resource:
discovery_documents = cls._get_discovery_document_versions(
discovery_doc_url)
resource_schema = cls._get_schema_for_resource(
discovery_documents,
resource_name)
schema = cls._add_asset_export_fields(
resource_schema, include_resource, include_iam_policy)
cls._schema_cache[cache_key] = schema
return schema