def run()

in retail/recommendation-system/bqml-scann/embeddings_exporter/pipeline.py [0:0]


def run(bq_dataset_name, embeddings_table_name, output_dir, pipeline_args):

    pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_args)
    project = pipeline_options.get_all_options()['project']
    with beam.Pipeline(options=pipeline_options) as pipeline:

      query = get_query(bq_dataset_name, embeddings_table_name)
      output_prefix = os.path.join(output_dir, EMBEDDING_FILE_PREFIX)
      
      _ = (
        pipeline
        | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
            project=project, query=query, use_standard_sql=True, flatten_results=False)
        | 'ConvertToCsv' >> beam.Map(to_csv)
        | 'WriteToCloudStorage' >> beam.io.WriteToText(
            file_path_prefix = output_prefix,
            file_name_suffix = ".csv")
      )