tools/asset-inventory/asset_inventory/import_pipeline.py (371 lines of code) (raw):
#!/usr/bin/env python
#
# Copyright 2019 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Import Cloud Asset Inventory exports into BigQuery.
Apache Beam pipeline to load Cloud Asset Inventory exports in GCS json objects
into a BigQuery dataset. There are options for appending to tables or truncating
them. The dataset must exist prior to import.
Most all export are small and can likely be processed very quickly by a single
machine with the direct runner. In some situations there might be a very large
number of assets like GCS buckets or BigQuery tables which will benefit from the
scalability of the Dataflow runner, or perhaps you wish to process the file from
environments that are not easily suited to large memory single machines like
Cloud Functions or App Engine.
"""
import copy
from datetime import datetime
import json
import logging
import pprint
import random
import apache_beam as beam
from apache_beam.coders import Coder
from apache_beam.io import ReadFromText
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.transforms import core
from asset_inventory import bigquery_schema
from asset_inventory.api_schema import APISchema
from six import string_types
from typing import Any
from google.api_core.exceptions import BadRequest
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
class JsonCoder(Coder):
"""A coder interpreting each line as a JSON string."""
def encode(self, value):
return json.dumps(value)
def decode(self, encoded):
return json.loads(encoded)
def to_type_hint(self):
return Any
class AssignGroupByKey(beam.DoFn):
"""Split assets based on input feature:
The group_by value can be either:
- ASSET_TYPE, so we have a table for that asset type like
`google.compute.Instance`.
- ASSET_TYPE_VERSION to have a table for each asset type and version like
`google.compute.Instance.v1alpha`
- NONE to have a single table for all assets.
- NAME for when merging the iam_policy and the resource together as an
intermediary step prior to load.
"""
def __init__(self, group_by, num_shards, *unused_args, **unused_kwargs):
if isinstance(group_by, string_types):
group_by = StaticValueProvider(str, group_by)
if isinstance(num_shards, str):
num_shards = StaticValueProvider(str, num_shards)
self.num_shards = num_shards
self.group_by = group_by
self.shard_map = None
def apply_shard(self, key):
"""Append shard suffix."""
# Initialize shard_map from num_shard if first time.
if self.shard_map is None:
self.shard_map = {
k: int(v) for (k, v) in
[sc.split('=') for sc in self.num_shards.get().split(',')]}
key_shards = self.shard_map.get(key)
if key_shards is None:
key_shards = self.shard_map.get('*')
if key_shards is None:
key_shards = 1
shard = random.randint(0, key_shards - 1)
return key + '.' + str(shard)
@classmethod
def remove_shard(cls, key):
"""Strip shard suffix."""
return key[:key.rfind('.')]
def process(self, element, **kwargs):
key = 'ASSET_TYPE'
group_by = self.group_by.get()
if group_by == 'NAME':
key = element['asset_type'] + '.' + element['name']
elif group_by == 'NONE':
key = self.apply_shard(element.pop('_group_by', 'resource'))
elif group_by == 'ASSET_TYPE':
# use group_by element override if present.
key = self.apply_shard(element.pop('_group_by',
element['asset_type']))
elif group_by == 'ASSET_TYPE_VERSION':
if 'resource' in element:
version = element['resource']['version']
key = element['asset_type'] + '.' + version
key = element.pop('_group_by', key)
key = self.apply_shard(key)
yield key, element
class BigQuerySchemaCombineFn(core.CombineFn):
"""Reduce a list of schemas into a single schema."""
def create_accumulator(self):
return []
def merge_accumulators(self, accumulators, **kwargs):
return bigquery_schema.merge_schemas(accumulators)
def extract_output(self, accumulator, **kwargs):
return accumulator
@staticmethod
def get_api_schema_for_resource(element):
"""This will contain the discovery document drive resource schema is present."""
element_resource = element.get('resource', {})
return APISchema.bigquery_schema_for_resource(
element['asset_type'],
element_resource.get('discovery_name', None),
element_resource.get('discovery_document_uri', None),
'data' in element_resource,
'iam_policy' in element)
def add_input(self, mutable_accumulator, element, **kwargs):
resource_schema = self.get_api_schema_for_resource(element)
# use the API resource's schema if we have one.
if bigquery_schema.contains_resource_data_schema(resource_schema):
return resource_schema
# we don't have a valid API schema, use the element schema.
return bigquery_schema.merge_schemas([
mutable_accumulator,
resource_schema,
bigquery_schema.translate_json_to_schema(element)])
class BigQuerySanitize(beam.DoFn):
"""Make the json acceptable to BigQuery."""
def process(self, element, **kwargs):
yield bigquery_schema.sanitize_property_value(element)
class ProduceResourceJson(beam.DoFn):
"""Create a json only element for every element."""
def __init__(self, group_by, *unused_args, **unused_kwargs):
if isinstance(group_by, string_types):
group_by = StaticValueProvider(str, group_by)
self.group_by = group_by
@staticmethod
def create_resource_copy(element):
# Write out a json consistently structured row called 'resource' to the
# 'resource' table. returns a copy of element without data field which
# is grouped by the 'resource' key.
resource_data = element.get('resource', {}).pop('data', None)
resource_copy = copy.deepcopy(element)
# add it back after the copy operation.
if resource_data is not None:
element['resource']['data'] = resource_data
# override the group by field, so it's written to the `resource` table
resource_copy['_group_by'] = 'resource'
return resource_copy
def process(self, element, **kwargs):
# add the resource.json_data property if we have resource.data
if ('resource' in element and
'data' in element['resource']):
element['resource']['json_data'] = json.dumps(
element['resource']['data'])
# yield the resource copy.
yield self.create_resource_copy(element)
# and the element if we are grouping by asset_type or
# asset_type_version.
if self.group_by.get() != 'NONE':
yield element
class AddLoadTime(beam.DoFn):
"""Add timestamp field to track load time."""
def __init__(self, load_time, *unused_args, **unused_kwargs):
if isinstance(load_time, string_types):
load_time = StaticValueProvider(str, load_time)
self.load_time = load_time
def process(self, element, **kwargs):
element[1]['timestamp'] = self.load_time.get()
yield element
class SanitizeBigQuerySchema(beam.DoFn):
"""Ensure final schema is valid."""
def process(self, element, **kwargs):
bigquery_schema.sanitize_bigquery_schema(element[1])
yield element
class EnforceSchemaDataTypes(beam.DoFn):
"""Convert values to match schema types.
Change json values to match the expected types of the input schema.
"""
def process(self, element, schemas, **kwargs):
"""Enforce the datatypes of the input schema on the element data.
:param element: the element to enforce schema on.
:param schemas: schemas to enforce
"""
key_name = element[0]
elements = element[1]
schema = schemas[key_name]
for elem in elements:
if elem.get('resource', {}).get('data', {}):
yield (element[0], bigquery_schema.enforce_schema_data_types(
elem, schema))
else:
yield element[0], elem
class CombinePolicyResource(beam.DoFn):
"""Unions two json documents.
Used when merging both the iam_policy and asset into a single document to be
represented as a single BigQuery row when loaded in the same table.
"""
def process(self, element, **kwargs):
combined = {}
for content in element[1]:
# don't merge a `resource` element.
if '_group_by' in content:
yield content
continue
combined.update(content)
if combined:
yield combined
class WriteToGCS(beam.DoFn):
"""Stage in GCE the files to load into BigQuery.
All written objects are prefixed by the input stage_dir and loadtime. There
is an object for each group-key, either an object per asset type, or for
each asset type version.
There is nothing cleaning up these objects, so it might be prudent to have a
lifecycle policy on the GCS destination bucket to purge old files.
"""
def __init__(self, stage_dir, load_time, *unused_args, **unused_kwargs):
if isinstance(stage_dir, string_types):
stage_dir = StaticValueProvider(str, stage_dir)
if isinstance(load_time, string_types):
load_time = StaticValueProvider(str, load_time)
self.stage_dir = stage_dir
self.load_time = load_time
self.open_files = {}
def get_path_for_key_name(self, key_name):
stage_dir = self.stage_dir.get()
load_time = self.load_time.get()
return FileSystems.join(stage_dir, load_time, key_name + '.json')
def start_bundle(self):
self.open_files = {}
def _get_file_for_element(self, element):
key_name = element[0]
if key_name in self.open_files:
return self.open_files[key_name], None
file_path = self.get_path_for_key_name(key_name)
file_handle = FileSystems.create(file_path, mime_type='text/json')
self.open_files[key_name] = file_handle
return file_handle, file_path
def process(self, element, **kwargs):
file_handle, created_file_path = self._get_file_for_element(element)
for asset_line in element[1]:
file_handle.write(json.dumps(asset_line).encode())
file_handle.write(b'\n')
if created_file_path:
# key is bigquery table name so each table deleted and created
# independently
# value is sharded key and gcs filepath.
yield (AssignGroupByKey.remove_shard(element[0]),
(element[0], created_file_path))
def finish_bundle(self):
for _, file_handle in self.open_files.items():
logging.info('finish bundle')
file_handle.close()
class AssignShardedKeyForLoad(beam.DoFn):
"""Element is a tuple keyed by table, value is iterable of sharded key and
gcs file path. The transform unbundled the iterable and return a tuples of
sharded key and gcs file path to be loaded in parallel to bigquery.
"""
def process(self, element, **kwargs):
for (sharded_key, created_file_path) in element[1]:
yield sharded_key, created_file_path
class BigQueryDoFn(beam.DoFn):
"""Superclass for a DoFn that requires BigQuery dataset information."""
def __init__(self, dataset, add_load_date_suffix, load_time, *unused_args, **unused_kwargs):
if isinstance(dataset, string_types):
dataset = StaticValueProvider(str, dataset)
self.dataset = dataset
if isinstance(add_load_date_suffix, string_types):
add_load_date_suffix = StaticValueProvider(
str, add_load_date_suffix)
self.add_load_date_suffix = add_load_date_suffix
if isinstance(load_time, string_types):
load_time = StaticValueProvider(str, load_time)
self.load_time = load_time
self.bigquery_client = None
self.dataset_location = None
self.load_jobs = {}
def get_dataset_ref(self):
dataset = self.dataset.get()
if '.' in dataset:
return bigquery.DatasetReference.from_string(dataset)
else:
return self.bigquery_client.dataset(dataset)
def get_dataset_location(self):
if self.dataset:
return self.bigquery_client.get_dataset(
self.get_dataset_ref()).location
return None
def asset_type_to_table_name(self, asset_type):
suffix = ''
add_load_date_suffix = self.add_load_date_suffix.get()
if (add_load_date_suffix and
add_load_date_suffix.lower() in ('yes', 'true', 't', '1')):
suffix = '_' + self.load_time.get()[0:10].replace('-', '')
return asset_type.replace('.', '_').replace('/', '_') + suffix
def start_bundle(self):
if not self.bigquery_client:
self.bigquery_client = bigquery.Client()
self.dataset_location = self.get_dataset_location()
self.load_jobs = {}
class DeleteDataSetTables(BigQueryDoFn):
"""Delete tables when truncating and not appending.
If we are not keeping old data around, it safer to delete tables in the
dataset before loading so that no old records remain.
"""
def __init__(self, dataset, add_load_date_suffix, load_time,
write_disposition):
# Can't use super().
# https://issues.apache.org/jira/browse/BEAM-6158?focusedCommentId=16919945
# super(DeleteDataSetTables, self).__init__(dataset)
BigQueryDoFn.__init__(self, dataset, add_load_date_suffix, load_time)
if isinstance(write_disposition, string_types):
write_disposition = StaticValueProvider(str, write_disposition)
self.write_disposition = write_disposition
def process(self, element, **kwargs):
# If we are appending to the table, no need to Delete first.
if self.write_disposition.get() == 'WRITE_APPEND':
yield element
return
# Delete the BigQuery table prior to loading to it.
key_name = element[0]
table_name = self.asset_type_to_table_name(key_name)
table_ref = self.get_dataset_ref().table(
table_name)
try:
self.bigquery_client.delete_table(table_ref)
except NotFound:
pass
yield element
class LoadToBigQuery(BigQueryDoFn):
"""Load each writen GCS object to BigQuery.
The Beam python SDK doesn't support dynamic BigQuery destinations yet so
this must be done within the workers.
"""
def __init__(self, dataset, add_load_date_suffix, load_time):
# Can't use super().
# https://issues.apache.org/jira/browse/BEAM-6158?focusedCommentId=16919945
# super(LoadToBigQuery, self).__init__(dataset)
BigQueryDoFn.__init__(self, dataset, add_load_date_suffix, load_time)
def to_bigquery_schema(self, fields):
"""Convert list of dicts into `bigquery.SchemaFields`."""
for field in fields:
if 'fields' in field:
field['fields'] = self.to_bigquery_schema(field['fields'])
return [bigquery.SchemaField(**field) for field in fields]
def process(self, element, schemas, **kwargs):
"""Element is a tuple of key_ name and iterable of filesystem paths.
:param schemas: schema of the table.
:param element: name of object to load.
"""
dataset_ref = self.get_dataset_ref()
sharded_key_name = element[0]
key_name = AssignGroupByKey.remove_shard(element[0])
object_paths = [object_path for object_path in element[1]]
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_APPEND'
job_config.ignore_unknown_values = True
job_config.schema_update_options = [
bigquery.job.SchemaUpdateOption.ALLOW_FIELD_ADDITION]
table_ref = dataset_ref.table(self.asset_type_to_table_name(key_name))
# use load_time as a timestamp.
job_config.time_partitioning = bigquery.table.TimePartitioning(
field='timestamp')
job_config.schema = self.to_bigquery_schema(schemas[sharded_key_name])
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
try:
load_job = self.bigquery_client.load_table_from_uri(
object_paths,
table_ref,
location=self.dataset_location,
job_config=job_config)
self.load_jobs[key_name] = load_job
except BadRequest as e:
logging.error('error in load_job %s, %s, %s, %s',
str(object_paths), str(table_ref),
str(self.dataset_location),
str(job_config.to_api_repr()))
raise e
def finish_bundle(self):
self.bigquery_client = None
# wait for the load jobs to complete
for _, load_job in self.load_jobs.items():
try:
load_job.result()
except BadRequest as e:
logging.error('error in load_job %s', load_job.self_link)
raise e
class ImportAssetOptions(PipelineOptions):
"""Required options.
All options are required, but are not marked as such to support creation
of Dataflow templates.
"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--group_by',
default='ASSET_TYPE',
choices=['ASSET_TYPE', 'ASSET_TYPE_VERSION', 'NONE'],
help='How to group exported resources into Bigquery tables.')
parser.add_value_provider_argument(
'--write_disposition',
default='WRITE_APPEND',
choices=['WRITE_APPEND', 'WRITE_EMPTY'],
help='To append to or overwrite BigQuery tables..')
parser.add_value_provider_argument(
'--input', help='A glob of all input asset json files to process.')
parser.add_value_provider_argument(
'--num_shards', help=(
'Number of shards to use per key.'
'List of asset types and the number'
'of shards to use for that type with "*" used as a default.'
' For example "google.compute.VpnTunnel=1,*=10"'),
default='*=1')
parser.add_value_provider_argument(
'--stage',
help='GCS location to write intermediary BigQuery load files.')
parser.add_value_provider_argument(
'--load_time',
default=datetime.now().isoformat(),
help='Load time of the data (YYYY-MM-DD[HH:MM:SS])).')
parser.add_value_provider_argument(
'--add_load_date_suffix',
default='False',
help='If the load date [YYYYMMDD] is added as a table suffix.')
parser.add_value_provider_argument(
'--dataset', help='BigQuery dataset to load to.')
def run(argv=None):
"""Construct the pipeline."""
options = ImportAssetOptions(argv)
p = beam.Pipeline(options=options)
# Cleanup json documents.
sanitized = (
p | 'read' >> ReadFromText(options.input, coder=JsonCoder())
| 'produce_resource_json' >> beam.ParDo(ProduceResourceJson(
options.group_by))
| 'bigquery_sanitize' >> beam.ParDo(BigQuerySanitize()))
# Joining all iam_policy objects with resources of the same name.
merged_iam = (
sanitized | 'assign_name_key' >> beam.ParDo(
AssignGroupByKey('NAME', ''))
| 'group_by_name' >> beam.GroupByKey()
| 'combine_policy' >> beam.ParDo(CombinePolicyResource()))
# split into BigQuery tables.
keyed_assets = merged_iam | 'assign_group_by_key' >> beam.ParDo(
AssignGroupByKey(options.group_by, options.num_shards))
# Generate BigQuery schema for each table.
schemas = (keyed_assets
| 'to_schema' >> core.CombinePerKey(
BigQuerySchemaCombineFn())
| 'sanitize_schema' >> beam.ParDo(SanitizeBigQuerySchema()))
pvalue_schemas = beam.pvalue.AsDict(schemas)
# Write to GCS and load to BigQuery.
# pylint: disable=expression-not-assigned
(keyed_assets
| 'add_load_time' >> beam.ParDo(AddLoadTime(options.load_time))
| 'group_by_sharded_key_for_enforce_schema' >> beam.GroupByKey()
| 'enforce_schema' >> beam.ParDo(EnforceSchemaDataTypes(), pvalue_schemas)
| 'group_by_sharded_key_for_write' >> beam.GroupByKey()
| 'write_to_gcs' >> beam.ParDo(
WriteToGCS(options.stage, options.load_time))
| 'group_written_objects_by_key' >> beam.GroupByKey()
| 'delete_tables' >> beam.ParDo(
DeleteDataSetTables(options.dataset, options.add_load_date_suffix,
options.load_time,
options.write_disposition))
| 'assign_sharded_key_for_load' >> beam.ParDo(AssignShardedKeyForLoad())
| 'group_by_sharded_key_for_load' >> beam.GroupByKey()
| 'load_to_bigquery' >> beam.ParDo(
LoadToBigQuery(options.dataset, options.add_load_date_suffix,
options.load_time),
beam.pvalue.AsDict(schemas)))
return p.run()
if __name__ == '__main__':
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
pipeline_result = run()
logging.info('waiting on pipeline : %s', pprint.pformat(pipeline_result))
state = pipeline_result.wait_until_finish()
logging.info('final state: %s', state)