in projects/dataflow-gcs-to-alloydb/src/dataflow_gcs_to_alloydb.py [0:0]
def run(argv=None, save_main_session=True):
"""Reads data from GCS and writes it to an AlloyDB database."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_file_pattern',
required=True,
help=(
'File path or pattern to the file(s). '
'Example: e.g. gs://bucket/path/*.csv'
),
)
parser.add_argument(
'--input_file_contains_headers',
type=int,
required=False, # Optional, used when using input_file_format=csv.
default=1,
choices=[0, 1],
help=(
'Whether the input CSV files contain a header record. '
'Use 1 for true and 0 for false. '
'Only used for CSV files. '
'Defaults to 1 (true).'
),
)
parser.add_argument(
'--input_file_format',
required=True,
help=('Source file format. Supported: avro, csv.'),
choices=['csv', 'avro'],
)
parser.add_argument(
'--input_schema',
dest='input_schema',
required=True,
help=(
'The input schema using dtype strings. '
'The format for each field is `field_name:dtype`. '
'The fields must follow the order of the file when in csv format. '
'Each field must be separated with `;`. '
'Example: `name:string;phone:number`.',
),
)
parser.add_argument(
'--input_csv_file_delimiter',
dest='input_csv_file_delimiter',
# Optional, used when using input_file_format=csv.
default=',',
help=(
'The column delimiter for the input CSV file (e.g. ","). '
'Only used for CSV files.'
),
)
parser.add_argument(
'--alloydb_ip',
dest='alloydb_ip',
default='127.0.0.1',
help='IP of the AlloyDB instance (e.g. 10.3.125.7)',
)
parser.add_argument(
'--alloydb_port',
dest='alloydb_port',
default='5432',
help='Port of the AlloyDB instance (e.g. 5432)',
)
parser.add_argument(
'--alloydb_database',
dest='alloydb_database',
default='postgres',
help='Name of the AlloyDB database (e.g. postgres)',
)
parser.add_argument(
'--alloydb_user',
dest='alloydb_user',
default='postgres',
help='User to login to Postgres/AlloyDB database',
)
parser.add_argument(
'--alloydb_password',
dest='alloydb_password',
help='Password of the Postgres/AlloyDB user to login',
)
parser.add_argument(
'--alloydb_table',
dest='alloydb_table',
required=True,
help='Name of the Postgres/AlloyDB table',
)
static_args, pipeline_args = parser.parse_known_args(argv)
pipeline_opts = pipeline_options.PipelineOptions(pipeline_args)
pipeline_opts.view_as(pipeline_options.SetupOptions).save_main_session = (
save_main_session
)
_validate_pipeline_options(static_args)
df_columns = _convert_input_schema_to_columns(static_args.input_schema)
df_dtypes = _convert_input_schema_to_dtypes(static_args.input_schema)
df_proxy = pandas.DataFrame(columns=df_columns).astype(df_dtypes)
with beam.Pipeline(options=pipeline_opts) as p:
if static_args.input_file_format == 'csv':
df_rows = (
p
| 'Read CSV File'
>> beam.io.ReadFromText(
static_args.input_file_pattern,
skip_header_lines=(static_args.input_file_contains_headers),
)
| 'Convert Row to DataFrame'
>> beam.Map(
_row_list_to_dataframe,
delimiter=static_args.input_csv_file_delimiter,
df_columns=df_columns,
df_dtypes=df_dtypes,
)
)
elif static_args.input_file_format == 'avro':
df_rows = (
p
| 'Read Avro File'
>> avroio.ReadFromAvro(
static_args.input_file_pattern,
as_rows=False,
)
| 'Convert Row to DataFrame'
>> beam.Map(
_row_dict_to_dataframe,
df_columns=df_columns,
df_dtypes=df_dtypes,
)
)
else:
raise ValueError('File input format must be csv or avro.')
data_rows = (
df_rows
| 'Convert DataFrame to row with BeamSchema'
>> beam.ParDo(
dataframe_convert.DataFrameToRowsFn(
proxy=df_proxy,
include_indexes=False,
)
)
)
_ = data_rows | 'Write to AlloyDB' >> jdbc.WriteToJdbc(
driver_class_name='org.postgresql.Driver',
table_name=static_args.alloydb_table,
jdbc_url=(
'jdbc:postgresql://'
f'{static_args.alloydb_ip}:'
f'{static_args.alloydb_port}/'
f'{static_args.alloydb_database}'
),
username=static_args.alloydb_user,
password=static_args.alloydb_password,
connection_properties='stringtype=unspecified',
)