def parse_args()

in python/dataproc_templates/snowflake/snowflake_to_gcs.py [0:0]


    def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
        parser: argparse.ArgumentParser = argparse.ArgumentParser()

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_URL}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_URL,
            required=True,
            help='Snowflake connection URL'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_USER}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_USER,
            required=True,
            help='Snowflake user name'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_PASSWORD}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_PASSWORD,
            required=True,
            help='Snowflake password'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_DATABASE}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_DATABASE,
            required=False,
            default="",
            help='Snowflake database name'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_WAREHOUSE}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_WAREHOUSE,
            required=False,
            default="",
            help='Snowflake datawarehouse name'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_AUTOPUSHDOWN}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_AUTOPUSHDOWN,
            required=False,
            default="on",
            help='Snowflake Autopushdown (on|off)'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_SCHEMA}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_SCHEMA,
            required=False,
            default="",
            help='Snowflake Schema, the source table belongs to'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_TABLE}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_TABLE,
            required=False,
            default="",
            help='Snowflake table name'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_SF_QUERY}',
            dest=constants.SNOWFLAKE_TO_GCS_SF_QUERY,
            required=False,
            default="",
            help='Query to be executed on Snowflake to fetch \
                the desired dataset for migration'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_OUTPUT_LOCATION}',
            dest=constants.SNOWFLAKE_TO_GCS_OUTPUT_LOCATION,
            required=True,
            help='Cloud Storage output location where the migrated data will be placed'
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_OUTPUT_MODE}',
            dest=constants.SNOWFLAKE_TO_GCS_OUTPUT_MODE,
            required=False,
            default=constants.OUTPUT_MODE_APPEND,
            help=(
                'Output write mode '
                '(one of: append,overwrite,ignore,errorifexists) '
                '(Defaults to append)'
            ),
            choices=[
                constants.OUTPUT_MODE_OVERWRITE,
                constants.OUTPUT_MODE_APPEND,
                constants.OUTPUT_MODE_IGNORE,
                constants.OUTPUT_MODE_ERRORIFEXISTS
            ]
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_OUTPUT_FORMAT}',
            dest=constants.SNOWFLAKE_TO_GCS_OUTPUT_FORMAT,
            required=False,
            default=constants.FORMAT_CSV,
            help=(
                'Output write format '
                '(one of: avro,parquet,csv,json)'
                '(Defaults to csv)'
            ),
            choices=[
                constants.FORMAT_AVRO,
                constants.FORMAT_PRQT,
                constants.FORMAT_CSV,
                constants.FORMAT_JSON
            ]
        )

        parser.add_argument(
            f'--{constants.SNOWFLAKE_TO_GCS_PARTITION_COLUMN}',
            dest=constants.SNOWFLAKE_TO_GCS_PARTITION_COLUMN,
            required=False,
            default="",
            help='Column name to partition data by, in Cloud Storage bucket'
        )
        add_spark_options(parser, constants.get_csv_output_spark_options("snowflake.gcs.output."))

        known_args: argparse.Namespace
        known_args, _ = parser.parse_known_args(args)

        if ((not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_DATABASE)
                or not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_SCHEMA)
                or not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_TABLE))
            and not getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_QUERY)):

            sys.exit("ArgumentParser Error: Either of snowflake.to.gcs.sf.database, snowflake.to.gcs.sf.schema and snowflake.to.gcs.sf.table "
                        + "OR snowflake.to.gcs.sf.query needs to be provided as argument to read data from Snowflake")

        elif ((getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_DATABASE)
                or getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_SCHEMA)
                or getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_TABLE))
            and getattr(known_args, constants.SNOWFLAKE_TO_GCS_SF_QUERY)):

            sys.exit("ArgumentParser Error: All three snowflake.to.gcs.sf.database, snowflake.to.gcs.sf.schema and snowflake.to.gcs.sf.table "
                        + "AND snowflake.to.gcs.sf.query cannot be provided as arguments at the same time.")

        return vars(known_args)