in script/legacy/export_to_parquet.py [0:0]
def main():
"""Read a table from BigQuery and write it as parquet."""
args = parser.parse_args()
# handle --submission-date
if args.submission_date is not None:
# --filter "submission_date = DATE 'SUBMISSION_DATE'"
condition = "submission_date = DATE '" + args.submission_date + "'"
args.filter.append(condition)
# --static-partitions submission_date=SUBMISSION_DATE
args.static_partitions.append("submission_date=" + args.submission_date)
# --where "submission_date IS NOT NULL"
if args.where == "TRUE":
args.where = condition
else:
args.where = "(" + args.where + ") AND " + condition
# Set default --destination-table if it was not provided
if args.destination_table is None:
args.destination_table = args.table
# append table and --static-partitions to destination
args.destination = "/".join(
[
re.sub("^s3://", "s3a://", args.destination).rstrip("/"),
re.sub("_(v[0-9]+)$", r"/\1", args.destination_table.rsplit(".", 1).pop()),
]
+ args.static_partitions
)
# convert --static-partitions to a dict
args.static_partitions = dict(p.split("=", 1) for p in args.static_partitions)
# remove --static-partitions fields from --partition-by
args.partition_by = [
f for f in args.partition_by if f not in args.static_partitions
]
# add --static-partitions fields to --drop
args.drop += args.static_partitions.keys()
# convert --filter to a single string
args.filter = " AND ".join(args.filter)
if args.maps_from_entries or args.bigint_columns is not None:
if "." in args.table:
table_ref = args.table.replace(":", ".")
else:
table_ref = f"{args.dataset}.{args.table}"
args.replace += transform_schema(
table_ref, args.maps_from_entries, args.bigint_columns
)
if args.dry_run:
replace = f"{args.replace!r}"
if len(replace) > 60:
replace = (
"["
+ ",".join(f"\n{' '*4*5}{expr!r}" for expr in args.replace)
+ f"\n{' '*4*4}]"
)
print("spark = SparkSession.builder.appName('export_to_parquet').getOrCreate()")
print("")
print(
"spark.conf.set('spark.sql.sources.partitionOverwriteMode', "
f"{args.partition_overwrite_mode!r})"
)
print("")
if args.avro_path is not None:
print(f"df = spark.read.format('avro').load({args.avro_path!r})")
else:
print(
dedent(
f"""
df = (
spark.read.format('bigquery')
.option('dataset', {args.dataset!r})
.option('table', {args.table!r})
.option('filter', {args.filter!r})
.option("parallelism", 0) # let BigQuery storage API decide
.load()
)
"""
).strip()
)
print("")
print(
dedent(
f"""
df = df.where({args.where!r}).selectExpr(*{args.select!r}).drop(*{args.drop!r})
for sql in {replace}:
value, name = re.fullmatch("(?i)(.*) AS (.*)", sql).groups()
df = df.withColumn(name, expr(value))
(
df.write.mode({args.write_mode!r})
.partitionBy(*{args.partition_by!r})
.parquet({args.destination!r})
)
""" # noqa:E501
).strip()
)
else:
# delay import to allow --dry-run without spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
if bigquery is None:
raise bigquery_error
spark = SparkSession.builder.appName("export_to_parquet").getOrCreate()
spark.conf.set(
"spark.sql.sources.partitionOverwriteMode", args.partition_overwrite_mode
)
# run spark job from parsed args
if args.avro_path is not None:
df = spark.read.format("avro").load(args.avro_path)
else:
df = (
spark.read.format("bigquery")
.option("dataset", args.dataset)
.option("table", args.table)
.option("filter", args.filter)
.option("parallelism", 0) # let BigQuery storage API decide
.load()
)
df = df.where(args.where).selectExpr(*args.select).drop(*args.drop)
for sql in args.replace:
value, name = re.fullmatch("(?i)(.*) AS (.*)", sql).groups()
df = df.withColumn(name, expr(value))
(
df.write.mode(args.write_mode)
.partitionBy(*args.partition_by)
.parquet(args.destination)
)