tools/asset-inventory/gae/main.py (84 lines of code) (raw):
# Copyright 2019 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A HTTP server that runs the Cloud Asset Inventory export and import process.
This server can run anywhere and can be invoked on a schedule to periodically
export the inventory assets and run the beam pipline to import into BigQuery.
The arguments are supplied in the `config.yaml` file in the same directory.
Works on python2.7 and python3 App Engine Standard when invoking the beam
pipeline via a template.
"""
from __future__ import print_function
import site
import importlib
import pkg_resources
site.addsitedir('lib')
try:
importlib.reload(pkg_resources)
except AttributeError:
pass
import yaml
import os
import logging
import json
import datetime
from flask import Flask, request
from asset_inventory import pipeline_runner
from asset_inventory import export
import pprint
class Config(object):
"""Hold config values."""
def __init__(self, config_file):
with open(config_file, 'r') as config_yaml:
config_dict = yaml.load(config_yaml, Loader=yaml.BaseLoader)
for pname in config_dict:
setattr(self, pname.lower(), config_dict[pname])
app = Flask(__name__)
CONFIG = Config('config.yaml')
def get_export_arguments():
"""Convert environment values into arguments."""
return (CONFIG.export_parent, CONFIG.gcs_destination,
CONFIG.export_content_types,
CONFIG.export_asset_types)
def get_import_arguments():
"""Convert environment values into arguments."""
return (CONFIG.import_pipeline_runner, CONFIG.import_dataflow_project,
CONFIG.import_template_region, CONFIG.import_template_location,
'{}/*.json'.format(CONFIG.gcs_destination), CONFIG.import_group_by,
CONFIG.import_write_disposition, CONFIG.import_dataset,
CONFIG.import_add_load_date_suffix,
CONFIG.import_stage,
datetime.datetime.now().isoformat(),
CONFIG.import_num_shards,
CONFIG.import_pipeline_arguments,
json.loads(CONFIG.import_pipeline_runtime_environment))
def run_import():
"""Run the import pipeline."""
# Load time is the current time.
import_arguments = get_import_arguments()
logging.info('running import %s', import_arguments)
(runner, dataflow_project, template_region, template_location,
input_location, group_by, write_disposition, dataset,
add_load_date_suffix, stage, load_time,
num_shards, pipeline_arguments,
pipeline_runtime_environment) = import_arguments
if runner == 'template':
return pipeline_runner.run_pipeline_template(
dataflow_project, template_region,
template_location, input_location,
group_by, write_disposition, dataset, stage,
load_time, num_shards, add_load_date_suffix,
pipeline_runtime_environment)
else:
return pipeline_runner.run_pipeline_beam_runner(
runner, dataflow_project, input_location,
group_by, write_disposition, dataset, stage, load_time,
num_shards, add_load_date_suffix, pipeline_arguments)
@app.route('/export_import')
def run_export_import():
"""Run export and import process."""
# Optionally require that we are invoked by a cron tasks and not any old
# pulic HTTP request.
if CONFIG.restrict_to_cron_tasks:
if 'X-Appengine-Cron' not in request.headers:
return '', 403
# perform export
export_arguments = get_export_arguments()
export_result = export.export_to_gcs_content_types(*export_arguments)
# perform import
final_state, pipeline_result = run_import()
# Return 200 if everything worked.
status_code = 200
if not pipeline_runner.is_successful_state(final_state):
status_code = 500
return pprint.pformat({
'export_result': export_result,
'pipeline_result': pipeline_result
}), status_code
@app.errorhandler(500)
def server_error(_):
# Log the error and stacktrace.
logging.exception('An error occurred during a request.')
return 'An internal error occurred.', 500
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
if __name__ == '__main__':
app.run(debug=True, port=5000)