retail/recommendation-system/bqml-scann/embeddings_exporter/pipeline.py (33 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import apache_beam as beam
EMBEDDING_FILE_PREFIX = 'embeddings'
def get_query(dataset_name, table_name):
query = f'''
SELECT
item_Id,
embedding
FROM
`{dataset_name}.{table_name}`;
'''
return query
def to_csv(entry):
item_Id = entry['item_Id']
embedding = entry['embedding']
csv_string = f'{item_Id},'
csv_string += ','.join([str(value) for value in embedding])
return csv_string
def run(bq_dataset_name, embeddings_table_name, output_dir, pipeline_args):
pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_args)
project = pipeline_options.get_all_options()['project']
with beam.Pipeline(options=pipeline_options) as pipeline:
query = get_query(bq_dataset_name, embeddings_table_name)
output_prefix = os.path.join(output_dir, EMBEDDING_FILE_PREFIX)
_ = (
pipeline
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
project=project, query=query, use_standard_sql=True, flatten_results=False)
| 'ConvertToCsv' >> beam.Map(to_csv)
| 'WriteToCloudStorage' >> beam.io.WriteToText(
file_path_prefix = output_prefix,
file_name_suffix = ".csv")
)