tools/asset-inventory/asset_inventory/main.py (112 lines of code) (raw):

#!/usr/bin/env python # # Copyright 2019 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Import Cloud Asset Inventory Assets to BigQuery. This is a command line utility to run the export and import pipeline. """ import argparse import datetime import json import logging import sys from asset_inventory import export from asset_inventory import pipeline_runner def parse_args(): """Parse command line arguments. Present a slightly simpler interface by constructing the pipeline input and stage parameters from the export gcs-destination if not supplied. Also accepts arbitrary beam pipeline arguments if using a beam runner. but # getting them into the help text is near impossible. Returns: List if strings. The user supplied command line arguments with added input and stage arguments. """ parser = argparse.ArgumentParser() # Take all the arguments that export takes. export.add_argparse_args(parser, True) parser.add_argument( '--group_by', default='ASSET_TYPE', choices=['ASSET_TYPE', 'ASSET_TYPE_VERSION', 'NONE'], # pylint: disable=line-too-long help=( 'How to group exported resources into Bigquery tables.\n' ' ASSET_TYPE: A table for each asset type (like google.compute.Instance\n' ' ASSET_TYPE_VERSION: A table for each asset type and api version (like google.compute.Instance.v1\n' ' NONE: One one table holding assets in a single json column\n')) parser.add_argument( '--write_disposition', default='WRITE_APPEND', choices=['WRITE_APPEND', 'WRITE_EMPTY'], help='When WRITE_EMPTY, will delete the tables first prior to loading.') parser.add_argument( '--stage', help=('Location to write intermediary data to load from from. ' 'Will be --gcs-destination + "/stage" if not supplied.')) parser.add_argument( '--load_time', default=datetime.datetime.now().isoformat(), help=('Load time of the data (YYYY-MM-DD[HH:MM:SS])). ' 'Defaults to "now".')) parser.add_argument( '--num_shards', default='*=1', help=('Number of shards to use per asset type.' 'List of asset types and the number ' 'of shards to use for that type with "*" used as a default.' ' For example "google.compute.VpnTunnel=1,*=10"')) parser.add_argument( '--dataset', help='BigQuery dataset to load to.', required=True, ) parser.add_argument( '--skip-export', help=('Do not perform asset export to GCS. Imports to bigquery' ' what\'s in the --gcs-destination. '), action='store_true', default=False) parser.add_argument( '--add-load-date-suffix', help='If load date is appended to table name.', action='store_true', default=False) parser.add_argument( '--template-job-launch-location', help=( 'The dataflow template to launch to import assets.' ' Should be a GCS location like ' # pylint: disable=line-too-long 'gs://professional-services-tools-asset-inventory/latest/import_pipeline')) parser.add_argument( '--template-job-project', help=('When launching a template via --template-job-launch-location, ' 'this is the project id to run the job in.')) parser.add_argument( '--template-job-region', default='us-central1', help=('When launching a template via --template-job-launch-location, ' 'This is the region to run in. (defaults to us-central1)')) def json_value(string_value): return json.loads(string_value) parser.add_argument( '--template-job-runtime-environment-json', type=json_value, help=('When launching a template via --template-job-launch-location, ' 'this is an optional json dict for ' 'runtime environment for the dataflow template launch request. ' 'See https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment. ' 'For example : \'{"maxWorkers": 10}\'')) args, beam_args = parser.parse_known_args() # If input isn't supplied, we can infer it from the export destination. if 'input' not in args or not args.input: args.input = '{}/*.json'.format(args.gcs_destination) # If stage isn't supplied, we can infer it from export destination. if 'stage' not in args or not args.stage: args.stage = args.gcs_destination + '/stage' return args, beam_args def main(): logging.basicConfig() logging.getLogger().setLevel(logging.INFO) # parse arguments args, beam_args = parse_args() # Perform the export (unless we are skipping it). if not args.skip_export: export.export_to_gcs_content_types(args.parent, args.gcs_destination, args.content_types, args.asset_types) # Perform the import, via template or beam runner. launch_location = args.template_job_launch_location if launch_location: final_state = pipeline_runner.run_pipeline_template( args.template_job_project, args.template_job_region, launch_location, args.input, args.group_by, args.write_disposition, args.dataset, args.stage, args.load_time, args.num_shards, args.add_load_date_suffix, args.template_job_runtime_environment_json) else: final_state = pipeline_runner.run_pipeline_beam_runner( None, None, args.input, args.group_by, args.write_disposition, args.dataset, args.stage, args.load_time, args.num_shards, args.add_load_date_suffix, beam_args) if not pipeline_runner.is_successful_state(final_state): sys.exit(1) if __name__ == '__main__': main()