def run()

in data-analytics/next25-turbocharge-ecomm/main.py [0:0]


def run(argv=None, save_main_session=True):
    """Runs the pipeline"""

    parser = argparse.ArgumentParser()
    parser.add_argument('--alloydb_username',
                        dest='alloydb_username',
                        required=True,
                        help='AlloyDB username')
    parser.add_argument('--alloydb_password',
                        dest='alloydb_password',
                        required=True,
                        help='AlloyDB password')
    parser.add_argument('--alloydb_ip',
                        dest='alloydb_ip',
                        required=True,
                        help='AlloyDB IP Address')
    parser.add_argument('--alloydb_port',
                        dest='alloydb_port',
                        default="5432",
                        help='AlloyDB Port')
    parser.add_argument('--alloydb_database',
                        dest='alloydb_database',
                        required=True,
                        help='AlloyDB Database name')
    parser.add_argument('--alloydb_table',
                        dest='alloydb_table',
                        required=True,
                        help='AlloyDB table name')
    parser.add_argument('--pubsub_subscription', dest='pubsub_subscription', required=True)
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:


        pubsub_pcoll = (
            p
            | "ReadFromPubSub" >> ReadFromPubSub(subscription=known_args.pubsub_subscription).with_output_types(bytes)
            | "DecodeMessage" >> beam.ParDo(DecodePubsubMessageFn())
        )
        pubsub_pcoll | "printPubsubstuff" >> beam.ParDo(lambda x: print(x))



        # Explain what a turnkey (MLTransform) is and why it is beneficial 
        embedding_pcoll = (
            pubsub_pcoll 
            | beam.ParDo(PrepImageFn()) 
            | "Embedding" >> MLTransform(write_artifact_location=artifact_location)
                .with_transform(mm_embedding_transform)
            | "ConvertToRows" >> beam.Map(
                lambda element: EmbeddingRowSchema(
                        id= element['id'],
                        image_path= element['image_path'], 
                        image_embedding= str(element['image']),
                        contextual_text = element['contextual_text']
                ))
                .with_output_types(EmbeddingRowSchema)
        )

        # embedding_pcoll | "printMLTransformResults" >> beam.Map(lambda x: print(x))

        inference_pcoll = (pubsub_pcoll
            | "getText" >> beam.Map(lambda data: data['contextual_text'])
            | "performGeminiInf" >> RunInference(GeminiModelHandler())
            | "printText" >> beam.Map(lambda x: print(x))
        )

        embedding_pcoll | 'Write to jdbc' >> WriteToJdbc(
            driver_class_name='org.postgresql.Driver',
            table_name=known_args.alloydb_table,
            jdbc_url=(f'jdbc:postgresql://{known_args.alloydb_ip}:'
                        f'{known_args.alloydb_port}'
                        f'/{known_args.alloydb_database}'),
            username=known_args.alloydb_username,
            password=known_args.alloydb_password,
            connection_properties='stringtype=unspecified'
        )