def main()

in script/legacy/export_to_parquet.py [0:0]


def main():
    """Read a table from BigQuery and write it as parquet."""
    args = parser.parse_args()

    # handle --submission-date
    if args.submission_date is not None:
        # --filter "submission_date = DATE 'SUBMISSION_DATE'"
        condition = "submission_date = DATE '" + args.submission_date + "'"
        args.filter.append(condition)
        # --static-partitions submission_date=SUBMISSION_DATE
        args.static_partitions.append("submission_date=" + args.submission_date)
        # --where "submission_date IS NOT NULL"
        if args.where == "TRUE":
            args.where = condition
        else:
            args.where = "(" + args.where + ") AND " + condition

    # Set default --destination-table if it was not provided
    if args.destination_table is None:
        args.destination_table = args.table

    # append table and --static-partitions to destination
    args.destination = "/".join(
        [
            re.sub("^s3://", "s3a://", args.destination).rstrip("/"),
            re.sub("_(v[0-9]+)$", r"/\1", args.destination_table.rsplit(".", 1).pop()),
        ]
        + args.static_partitions
    )

    # convert --static-partitions to a dict
    args.static_partitions = dict(p.split("=", 1) for p in args.static_partitions)

    # remove --static-partitions fields from --partition-by
    args.partition_by = [
        f for f in args.partition_by if f not in args.static_partitions
    ]

    # add --static-partitions fields to --drop
    args.drop += args.static_partitions.keys()

    # convert --filter to a single string
    args.filter = " AND ".join(args.filter)

    if args.maps_from_entries or args.bigint_columns is not None:
        if "." in args.table:
            table_ref = args.table.replace(":", ".")
        else:
            table_ref = f"{args.dataset}.{args.table}"
        args.replace += transform_schema(
            table_ref, args.maps_from_entries, args.bigint_columns
        )

    if args.dry_run:
        replace = f"{args.replace!r}"
        if len(replace) > 60:
            replace = (
                "["
                + ",".join(f"\n{' '*4*5}{expr!r}" for expr in args.replace)
                + f"\n{' '*4*4}]"
            )
        print("spark = SparkSession.builder.appName('export_to_parquet').getOrCreate()")
        print("")
        print(
            "spark.conf.set('spark.sql.sources.partitionOverwriteMode', "
            f"{args.partition_overwrite_mode!r})"
        )
        print("")
        if args.avro_path is not None:
            print(f"df = spark.read.format('avro').load({args.avro_path!r})")
        else:
            print(
                dedent(
                    f"""
                    df = (
                        spark.read.format('bigquery')
                        .option('dataset', {args.dataset!r})
                        .option('table', {args.table!r})
                        .option('filter', {args.filter!r})
                        .option("parallelism", 0)  # let BigQuery storage API decide
                        .load()
                    )
                    """
                ).strip()
            )
        print("")
        print(
            dedent(
                f"""
                df = df.where({args.where!r}).selectExpr(*{args.select!r}).drop(*{args.drop!r})

                for sql in {replace}:
                    value, name = re.fullmatch("(?i)(.*) AS (.*)", sql).groups()
                    df = df.withColumn(name, expr(value))

                (
                    df.write.mode({args.write_mode!r})
                    .partitionBy(*{args.partition_by!r})
                    .parquet({args.destination!r})
                )
                """  # noqa:E501
            ).strip()
        )
    else:
        # delay import to allow --dry-run without spark
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import expr

        if bigquery is None:
            raise bigquery_error

        spark = SparkSession.builder.appName("export_to_parquet").getOrCreate()

        spark.conf.set(
            "spark.sql.sources.partitionOverwriteMode", args.partition_overwrite_mode
        )

        # run spark job from parsed args
        if args.avro_path is not None:
            df = spark.read.format("avro").load(args.avro_path)
        else:
            df = (
                spark.read.format("bigquery")
                .option("dataset", args.dataset)
                .option("table", args.table)
                .option("filter", args.filter)
                .option("parallelism", 0)  # let BigQuery storage API decide
                .load()
            )

        df = df.where(args.where).selectExpr(*args.select).drop(*args.drop)

        for sql in args.replace:
            value, name = re.fullmatch("(?i)(.*) AS (.*)", sql).groups()
            df = df.withColumn(name, expr(value))

        (
            df.write.mode(args.write_mode)
            .partitionBy(*args.partition_by)
            .parquet(args.destination)
        )