tools/asset-inventory/asset_inventory/pipeline_runner.py (89 lines of code) (raw):
# Copyright 2019 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code to invoke the pipeline."""
import logging
import pprint
import time
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
def get_job_name(load_time):
"""User-friendly job name from load_time."""
return ('cloud-asset-import-' + load_time.lower().replace(
':', '-').replace(' ', '').replace('.', '-'))
def is_successful_state(final_state):
"""True if the status is successful.
Checks both for beam and template runner success codes.
Args:
final_state: Final state the pipeline is in.
Returns:
True if the job was successful.
"""
if final_state not in ['JOB_STATE_DONE', 'DONE']:
return False
return True
def wait_on_pipeline_job(df_service, pipeline_job):
"""Poll the job status every 60 seconds until done."""
dataflow_project = pipeline_job['projectId']
template_region = pipeline_job['location']
job_id = pipeline_job['id']
pipeline_job = df_service.projects().locations().jobs().get(
location=template_region, projectId=dataflow_project,
jobId=job_id).execute(num_retries=5)
logging.info('job status %s', pprint.pformat(pipeline_job))
current_state = pipeline_job['currentState']
# We have reached a terminal state.
if current_state in [
'JOB_STATE_DONE', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED',
'JOB_STATE_UPDATED', 'JOB_STATE_DRAINED'
]:
logging.info('final pipeline state : %s', current_state)
return current_state, pipeline_job
logging.info('sleeping 60 seconds before polling.')
time.sleep(60)
return wait_on_pipeline_job(df_service, pipeline_job)
def run_pipeline_template(dataflow_project, template_region, template_location,
input_location, group_by, write_disposition, dataset,
stage, load_time, num_shards, add_load_date_suffix,
runtime_environment):
"""Invoke the supplied pipeline template.
Args:
dataflow_project: Project to run the dataflow job in.
template_region: Region to run the job in.
template_location: GCS path to the template file.
input_location: GCS path load json documents from,
group_by: How to split assets into tables.
write_disposition: To append to or overwrite BigQuery tables.
dataset: BigQuery dataset to write to.
stage: GCS path to write BigQuery load files.
load_time: Timestamp or date to load data with.
num_shards: Shards for each asset type.
add_load_date_suffix: If the load date is added as a table suffix.
runtime_environment: Dict supplying other runtime overrides.
Returns:
End state of the pipline and job object.
"""
credentials = GoogleCredentials.get_application_default()
df_service = build('dataflow', 'v1b3', credentials=credentials,
cache_discovery=False)
# Set the following variables to your values.
job_name = get_job_name(load_time)
body = {
'jobName': job_name,
'parameters': {
'input': input_location,
'load_time': load_time,
'stage': stage,
'group_by': group_by,
'write_disposition': write_disposition,
'num_shards': num_shards,
'add_load_date_suffix': add_load_date_suffix,
'dataset': dataset,
},
'environment': runtime_environment
}
logging.info('launching template %s in %s:%s with %s', template_location,
dataflow_project, template_region, pprint.pformat(body))
launch_result = df_service.projects().locations().templates().launch(
location=template_region,
projectId=dataflow_project,
gcsPath=template_location,
body=body).execute(num_retries=5)
logging.info('waiting on pipeline : %s', pprint.pformat(launch_result))
return wait_on_pipeline_job(df_service, launch_result['job'])
def run_pipeline_beam_runner(pipeline_runner, dataflow_project, input_location,
group_by, write_disposition, dataset, stage,
load_time, num_shards, add_load_date_suffix,
pipeline_arguments):
"""Invokes the pipeline with a beam runner.
Only tested with the dataflow and direct runners.
Args:
pipeline_runner: The Beam runner to use.
dataflow_project: Project to run the dataflow job in.
input_location: GCS path load json documents from,
group_by: How to split assets into tables.
write_disposition: To append to or overwrite BigQuery tables.
dataset: BigQuery dataset to write to.
stage: GCS path to write BigQuery load files.
load_time: Timestamp to add to data during BigQuery load.
num_shards: Shards for each asset type.
add_load_date_suffix: If the load date is added as a table suffix.
pipeline_arguments: List of additional runner arguments.
Returns:
The end state of the pipeline run (a string), and PipelineResult.
"""
# pylint: disable=import-error
# import on demand as we don't want to depend on pipeline code which imports
# apache beam code unless we are using a beam runner and not invoking a
# template.
from asset_inventory import import_pipeline
job_name = get_job_name(load_time)
pipeline_parameters = pipeline_arguments
parameters = {
'--load_time': load_time,
'--job_name': job_name,
'--project': dataflow_project,
'--input': input_location,
'--group_by': group_by,
'--write_disposition': write_disposition,
'--num_shards': num_shards,
'--add_load_date_suffix': add_load_date_suffix,
'--dataset': dataset,
'--stage': stage,
'--runner': pipeline_runner
}
for arg_name, value in parameters.items():
if value and arg_name not in pipeline_parameters:
pipeline_parameters += [arg_name, value]
pipeline_result = import_pipeline.run(pipeline_parameters)
logging.info('waiting on pipeline : %s', pprint.pformat(pipeline_result))
state = pipeline_result.wait_until_finish()
logging.info('final pipeline state: %s', state)
return pipeline_result.state, pipeline_result