in retail/recommendation-system/bqml-scann/embeddings_exporter/pipeline.py [0:0]
def run(bq_dataset_name, embeddings_table_name, output_dir, pipeline_args):
pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_args)
project = pipeline_options.get_all_options()['project']
with beam.Pipeline(options=pipeline_options) as pipeline:
query = get_query(bq_dataset_name, embeddings_table_name)
output_prefix = os.path.join(output_dir, EMBEDDING_FILE_PREFIX)
_ = (
pipeline
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
project=project, query=query, use_standard_sql=True, flatten_results=False)
| 'ConvertToCsv' >> beam.Map(to_csv)
| 'WriteToCloudStorage' >> beam.io.WriteToText(
file_path_prefix = output_prefix,
file_name_suffix = ".csv")
)