in python/dataproc_templates/snowflake/snowflake_to_gcs.py [0:0]
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_URL}',
dest=constants.SNOWFLAKE_TO_GCS_SF_URL,
required=True,
help='Snowflake connection URL'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_USER}',
dest=constants.SNOWFLAKE_TO_GCS_SF_USER,
required=True,
help='Snowflake user name'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_PASSWORD}',
dest=constants.SNOWFLAKE_TO_GCS_SF_PASSWORD,
required=True,
help='Snowflake password'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_DATABASE}',
dest=constants.SNOWFLAKE_TO_GCS_SF_DATABASE,
required=False,
default="",
help='Snowflake database name'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_WAREHOUSE}',
dest=constants.SNOWFLAKE_TO_GCS_SF_WAREHOUSE,
required=False,
default="",
help='Snowflake datawarehouse name'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_AUTOPUSHDOWN}',
dest=constants.SNOWFLAKE_TO_GCS_SF_AUTOPUSHDOWN,
required=False,
default="on",
help='Snowflake Autopushdown (on|off)'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_SCHEMA}',
dest=constants.SNOWFLAKE_TO_GCS_SF_SCHEMA,
required=False,
default="",
help='Snowflake Schema, the source table belongs to'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_TABLE}',
dest=constants.SNOWFLAKE_TO_GCS_SF_TABLE,
required=False,
default="",
help='Snowflake table name'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_SF_QUERY}',
dest=constants.SNOWFLAKE_TO_GCS_SF_QUERY,
required=False,
default="",
help='Query to be executed on Snowflake to fetch \
the desired dataset for migration'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_OUTPUT_LOCATION}',
dest=constants.SNOWFLAKE_TO_GCS_OUTPUT_LOCATION,
required=True,
help='Cloud Storage output location where the migrated data will be placed'
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_OUTPUT_MODE}',
dest=constants.SNOWFLAKE_TO_GCS_OUTPUT_MODE,
required=False,
default=constants.OUTPUT_MODE_APPEND,
help=(
'Output write mode '
'(one of: append,overwrite,ignore,errorifexists) '
'(Defaults to append)'
),
choices=[
constants.OUTPUT_MODE_OVERWRITE,
constants.OUTPUT_MODE_APPEND,
constants.OUTPUT_MODE_IGNORE,
constants.OUTPUT_MODE_ERRORIFEXISTS
]
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_OUTPUT_FORMAT}',
dest=constants.SNOWFLAKE_TO_GCS_OUTPUT_FORMAT,
required=False,
default=constants.FORMAT_CSV,
help=(
'Output write format '
'(one of: avro,parquet,csv,json)'
'(Defaults to csv)'
),
choices=[
constants.FORMAT_AVRO,
constants.FORMAT_PRQT,
constants.FORMAT_CSV,
constants.FORMAT_JSON
]
)
parser.add_argument(
f'--{constants.SNOWFLAKE_TO_GCS_PARTITION_COLUMN}',
dest=constants.SNOWFLAKE_TO_GCS_PARTITION_COLUMN,
required=False,
default="",
help='Column name to partition data by, in Cloud Storage bucket'
)
add_spark_options(parser, constants.get_csv_output_spark_options("snowflake.gcs.output."))
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
if ((not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_DATABASE)
or not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_SCHEMA)
or not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_TABLE))
and not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_QUERY)):
sys.exit("ArgumentParser Error: Either of snowflake.to.gcs.sf.database, snowflake.to.gcs.sf.schema and snowflake.to.gcs.sf.table "
+ "OR snowflake.to.gcs.sf.query needs to be provided as argument to read data from Snowflake")
elif ((getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_DATABASE)
or getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_SCHEMA)
or getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_TABLE))
and getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_QUERY)):
sys.exit("ArgumentParser Error: All three snowflake.to.gcs.sf.database, snowflake.to.gcs.sf.schema and snowflake.to.gcs.sf.table "
+ "AND snowflake.to.gcs.sf.query cannot be provided as arguments at the same time.")
return vars(known_args)