in src/utils/run_utils.py [0:0]
def _main(argv):
params = _convert_params(FLAGS.params)
if not os.path.exists(params['sequence_path']):
raise FileNotFoundError('Invalid sequence path')
if not os.path.exists(FLAGS.pipeline_template_path):
raise FileNotFoundError('Invalid path to pipeline JSON')
sequence_file_name = os.path.basename(params['sequence_path'])
gcs_sequence_path = f'{FLAGS.staging_bucket}/fasta/{sequence_file_name}'
logging.info(f'Copying {params["sequence_path"]} to {gcs_sequence_path}')
_copy_sequence(params['sequence_path'], gcs_sequence_path)
params['sequence_path'] = gcs_sequence_path
vertex_ai.init(
project=FLAGS.project_id,
location=FLAGS.region,
staging_bucket=FLAGS.staging_bucket,
)
pipeline_name = os.path.basename(
FLAGS.pipeline_template_path).split('.')[0].lower()
labels = {
'experiment_id': FLAGS.experiment_id,
'sequence_id': sequence_file_name.split('.')[0].lower()
}
pipeline_job = vertex_ai.PipelineJob(
display_name=pipeline_name,
template_path=FLAGS.pipeline_template_path,
pipeline_root=f'{FLAGS.staging_bucket}/pipeline_runs/{pipeline_name}',
parameter_values=params,
enable_caching=FLAGS.enable_caching,
labels=labels
)
pipeline_job.run(
sync=False,
service_account=FLAGS.pipelines_sa)