in data-analytics/next25-turbocharge-ecomm/main.py [0:0]
def run(argv=None, save_main_session=True):
"""Runs the pipeline"""
parser = argparse.ArgumentParser()
parser.add_argument('--alloydb_username',
dest='alloydb_username',
required=True,
help='AlloyDB username')
parser.add_argument('--alloydb_password',
dest='alloydb_password',
required=True,
help='AlloyDB password')
parser.add_argument('--alloydb_ip',
dest='alloydb_ip',
required=True,
help='AlloyDB IP Address')
parser.add_argument('--alloydb_port',
dest='alloydb_port',
default="5432",
help='AlloyDB Port')
parser.add_argument('--alloydb_database',
dest='alloydb_database',
required=True,
help='AlloyDB Database name')
parser.add_argument('--alloydb_table',
dest='alloydb_table',
required=True,
help='AlloyDB table name')
parser.add_argument('--pubsub_subscription', dest='pubsub_subscription', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
pubsub_pcoll = (
p
| "ReadFromPubSub" >> ReadFromPubSub(subscription=known_args.pubsub_subscription).with_output_types(bytes)
| "DecodeMessage" >> beam.ParDo(DecodePubsubMessageFn())
)
pubsub_pcoll | "printPubsubstuff" >> beam.ParDo(lambda x: print(x))
# Explain what a turnkey (MLTransform) is and why it is beneficial
embedding_pcoll = (
pubsub_pcoll
| beam.ParDo(PrepImageFn())
| "Embedding" >> MLTransform(write_artifact_location=artifact_location)
.with_transform(mm_embedding_transform)
| "ConvertToRows" >> beam.Map(
lambda element: EmbeddingRowSchema(
id= element['id'],
image_path= element['image_path'],
image_embedding= str(element['image']),
contextual_text = element['contextual_text']
))
.with_output_types(EmbeddingRowSchema)
)
# embedding_pcoll | "printMLTransformResults" >> beam.Map(lambda x: print(x))
inference_pcoll = (pubsub_pcoll
| "getText" >> beam.Map(lambda data: data['contextual_text'])
| "performGeminiInf" >> RunInference(GeminiModelHandler())
| "printText" >> beam.Map(lambda x: print(x))
)
embedding_pcoll | 'Write to jdbc' >> WriteToJdbc(
driver_class_name='org.postgresql.Driver',
table_name=known_args.alloydb_table,
jdbc_url=(f'jdbc:postgresql://{known_args.alloydb_ip}:'
f'{known_args.alloydb_port}'
f'/{known_args.alloydb_database}'),
username=known_args.alloydb_username,
password=known_args.alloydb_password,
connection_properties='stringtype=unspecified'
)