def run()

in projects/dataflow-gcs-to-alloydb/src/dataflow_gcs_to_alloydb.py [0:0]


def run(argv=None, save_main_session=True):
    """Reads data from GCS and writes it to an AlloyDB database."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_file_pattern',
        required=True,
        help=(
            'File path or pattern to the file(s). '
            'Example: e.g. gs://bucket/path/*.csv'
        ),
    )
    parser.add_argument(
        '--input_file_contains_headers',
        type=int,
        required=False,  # Optional, used when using input_file_format=csv.
        default=1,
        choices=[0, 1],
        help=(
            'Whether the input CSV files contain a header record. '
            'Use 1 for true and 0 for false. '
            'Only used for CSV files. '
            'Defaults to 1 (true).'
        ),
    )
    parser.add_argument(
        '--input_file_format',
        required=True,
        help=('Source file format. Supported: avro, csv.'),
        choices=['csv', 'avro'],
    )
    parser.add_argument(
        '--input_schema',
        dest='input_schema',
        required=True,
        help=(
            'The input schema using dtype strings. '
            'The format for each field is `field_name:dtype`. '
            'The fields must follow the order of the file when in csv format. '
            'Each field must be separated with `;`. '
            'Example: `name:string;phone:number`.',
        ),
    )
    parser.add_argument(
        '--input_csv_file_delimiter',
        dest='input_csv_file_delimiter',
        # Optional, used when using input_file_format=csv.
        default=',',
        help=(
            'The column delimiter for the input CSV file (e.g. ","). '
            'Only used for CSV files.'
        ),
    )

    parser.add_argument(
        '--alloydb_ip',
        dest='alloydb_ip',
        default='127.0.0.1',
        help='IP of the AlloyDB instance (e.g. 10.3.125.7)',
    )
    parser.add_argument(
        '--alloydb_port',
        dest='alloydb_port',
        default='5432',
        help='Port of the AlloyDB instance (e.g. 5432)',
    )
    parser.add_argument(
        '--alloydb_database',
        dest='alloydb_database',
        default='postgres',
        help='Name of the AlloyDB database (e.g. postgres)',
    )
    parser.add_argument(
        '--alloydb_user',
        dest='alloydb_user',
        default='postgres',
        help='User to login to Postgres/AlloyDB database',
    )
    parser.add_argument(
        '--alloydb_password',
        dest='alloydb_password',
        help='Password of the Postgres/AlloyDB user to login',
    )
    parser.add_argument(
        '--alloydb_table',
        dest='alloydb_table',
        required=True,
        help='Name of the Postgres/AlloyDB table',
    )

    static_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_opts = pipeline_options.PipelineOptions(pipeline_args)
    pipeline_opts.view_as(pipeline_options.SetupOptions).save_main_session = (
        save_main_session
    )

    _validate_pipeline_options(static_args)

    df_columns = _convert_input_schema_to_columns(static_args.input_schema)
    df_dtypes = _convert_input_schema_to_dtypes(static_args.input_schema)
    df_proxy = pandas.DataFrame(columns=df_columns).astype(df_dtypes)

    with beam.Pipeline(options=pipeline_opts) as p:
        if static_args.input_file_format == 'csv':
            df_rows = (
                p
                | 'Read CSV File'
                >> beam.io.ReadFromText(
                    static_args.input_file_pattern,
                    skip_header_lines=(static_args.input_file_contains_headers),
                )
                | 'Convert Row to DataFrame'
                >> beam.Map(
                    _row_list_to_dataframe,
                    delimiter=static_args.input_csv_file_delimiter,
                    df_columns=df_columns,
                    df_dtypes=df_dtypes,
                )
            )
        elif static_args.input_file_format == 'avro':
            df_rows = (
                p
                | 'Read Avro File'
                >> avroio.ReadFromAvro(
                    static_args.input_file_pattern,
                    as_rows=False,
                )
                | 'Convert Row to DataFrame'
                >> beam.Map(
                    _row_dict_to_dataframe,
                    df_columns=df_columns,
                    df_dtypes=df_dtypes,
                )
            )
        else:
            raise ValueError('File input format must be csv or avro.')

        data_rows = (
            df_rows
            | 'Convert DataFrame to row with BeamSchema'
            >> beam.ParDo(
                dataframe_convert.DataFrameToRowsFn(
                    proxy=df_proxy,
                    include_indexes=False,
                )
            )
        )

        _ = data_rows | 'Write to AlloyDB' >> jdbc.WriteToJdbc(
            driver_class_name='org.postgresql.Driver',
            table_name=static_args.alloydb_table,
            jdbc_url=(
                'jdbc:postgresql://'
                f'{static_args.alloydb_ip}:'
                f'{static_args.alloydb_port}/'
                f'{static_args.alloydb_database}'
            ),
            username=static_args.alloydb_user,
            password=static_args.alloydb_password,
            connection_properties='stringtype=unspecified',
        )