variant/regen.py (48 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This program uses Apache Spark to generate example binary Variant data
#
# Requirements
# pip install pyarrow
# pip install pyspark
#
# Last run with Spark 4.0 preview 2:
# https://spark.apache.org/news/spark-4.0.0-preview2.html
from pyspark.sql import SparkSession
import pyarrow.parquet as pq
import os
import json
# Initialize Spark session and create variant data via SQL
spark = SparkSession.builder \
.appName("PySpark SQL Example") \
.getOrCreate()
# recursively cleanup the spark-warehouse directory
if os.path.exists('spark-warehouse'):
for root, dirs, files in os.walk('spark-warehouse', topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
# Create a table with variant and insert various types into it
#
# This writes data files into spark-warehouse/output
sql = """
CREATE TABLE T (name VARCHAR(2000), variant_col VARIANT);
-------------------------------
-- Primitive type (basic_type=0)
-------------------------------
-- One row with a value from each type listed in
-- https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#encoding-types
--
-- Spark Types: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
-- Note: must use explicit typecasts as Spark returns an error for implicit casts
INSERT INTO T VALUES ('primitive_null', NULL);
INSERT INTO T VALUES ('primitive_boolean_true', true::Variant);
INSERT INTO T VALUES ('primitive_boolean_false', false::Variant);
INSERT INTO T VALUES ('primitive_int8', 42::Byte::Variant);
INSERT INTO T VALUES ('primitive_int16', 1234::Short::Variant);
INSERT INTO T VALUES ('primitive_int32', 123456::Integer::Variant);
INSERT INTO T VALUES ('primitive_int64', 12345678::Long::Variant);
INSERT INTO T VALUES ('primitive_double', 1234567890.1234::Double::Variant);
INSERT INTO T VALUES ('primitive_decimal4', 12.34::Decimal(8,2)::Variant);
INSERT INTO T VALUES ('primitive_decimal8', 12345678.90::Decimal(12,2)::Variant);
INSERT INTO T VALUES ('primitive_decimal16', 12345678912345678.90::Decimal(30,2)::Variant);
INSERT INTO T VALUES ('primitive_date', '2025-04-16'::Date::Variant);
INSERT INTO T VALUES ('primitive_timestamp', '2025-04-16T12:34:56.78'::Timestamp::Variant);
INSERT INTO T VALUES ('primitive_timestampntz', '2025-04-16T12:34:56.78'::Timestamp_NTZ::Variant);
INSERT INTO T VALUES ('primitive_float', 1234567890.1234::Float::Variant);
INSERT INTO T VALUES ('primitive_binary', X'31337deadbeefcafe'::Variant);
INSERT INTO T VALUES ('primitive_string', 'This string is longer than 64 bytes and therefore does not fit in a short_string and it also includes several non ascii characters such as 🐢, 💖, ♥️, 🎣 and 🤦!!'::Variant);
-- https://github.com/apache/parquet-testing/issues/79
-- is not clear how to create the following types using Spark SQL
-- TODO TimeNTZ (Type ID 17)
-- TODO 'timestamp with timezone (NANOS)' (Type ID 18)
-- TODO 'timestamp with time zone (NANOS)' (Type ID 19)
-- TODO 'UUID' (Type ID 20)
-------------------------------
-- Short string (basic_type=1)
-------------------------------
INSERT INTO T VALUES ('short_string', 'Less than 64 bytes (❤️ with utf8)'::Variant);
-------------------------------
-- Object (basic_type=2)
-------------------------------
-- Use parse_json to create Variant, as spark does not seem to support casting structs --> Variant.
INSERT INTO T VALUES ('object_empty', parse_json('{}')::Variant);
INSERT INTO T VALUES ('object_primitive', parse_json('{"int_field" : 1, "double_field": 1.23456789, "boolean_true_field": true, "boolean_false_field": false, "string_field": "Apache Parquet", "null_field": null, "timestamp_field": "2025-04-16T12:34:56.78"}')::Variant);
INSERT INTO T VALUES ('object_nested', parse_json('{ "id" : 1, "species" : { "name": "lava monster", "population": 6789}, "observation" : { "time": "12:34:56", "location": "In the Volcano", "value" : { "temperature": 123, "humidity": 456 } } }')::Variant);
-- https://github.com/apache/parquet-testing/issues/77
-- TODO create example variant objects with fields that non-json types (like timestamp, date, etc)
-- Casting from "STRUCT<...>" to "VARIANT"" is not yet supported
-- INSERT INTO T VALUES ('object_primitive', struct(1234.56::Double as double_field, true as boolean_true_field, false as boolean_false_field, '2025-04-16T12:34:56.78'::Timestamp as timestamp_field, 'Apache Parquet' as string_field, null as null_field)::Variant);
--TODO objects with more than 2**8 distinct fields (that require using more than one byte for field offset)
--TODO objects with more than 2**16 distinct fields (that require using more than 2 bytes for field offset)
--TODO objects with more than 2**24 distinct fields (that require using more than 3 bytes for field offset)
-------------------------------
-- Array (basic_type=3)
-------------------------------
INSERT INTO T VALUES ('array_empty', parse_json('[]')::Variant);
INSERT INTO T VALUES ('array_primitive', parse_json('[2, 1, 5, 9]')::Variant);
INSERT INTO T VALUES ('array_nested', parse_json('[ { "id": 1, "thing": { "names": ["Contrarian", "Spider"] } }, null, { "id": 2, "type": "if", "names": ["Apple", "Ray", null] } ]')::Variant);
-- https://github.com/apache/parquet-testing/issues/78
-- TODO arrays with more than 2**8 distinct elements (that require using more than one byte for count)
-- TODO arrays where the total length of all values is greater than 2**8, 2**16, and 2**24 bytes (that require using more than one byte for the offsets)
-------------------------------
-- Output the value to a new table that also has the JSON representation of the variant column
-------------------------------
DROP TABLE IF EXISTS output;
CREATE TABLE output AS SELECT name, variant_col, to_json(variant_col) as json_col FROM T;
"""
for statement in sql.split("\n"):
statement = statement.strip()
if not statement or statement.startswith("--"):
continue
print("Running SQL:", statement)
spark.sql(statement)
mypath = 'spark-warehouse/output'
parquet_files = [f for f in os.listdir(mypath) if f.endswith('.parquet')]
# extract the values from the parquet files
data_dictionary = {}
for f in parquet_files:
table = pq.read_table(os.path.join(mypath, f))
for row in range(len(table)):
name = table[0][row]
# variants are stored as StructArrays with two fields:
# metadata, and value
variant_col = table[1][row]
metadata = variant_col['metadata']
value = variant_col['value']
json_value = table[2][row]
print("Writing metadata for", name)
# write the metadata, value, and json representation to files
with open(f"{name}.metadata", "wb") as f:
buffer = metadata.as_buffer()
if buffer is not None:
f.write(buffer)
with open(f"{name}.value", "wb") as f:
buffer = value.as_buffer()
if buffer is not None:
f.write(buffer)
# Add the JSON representation to the data dictionary
name = name.as_py()
json_value = json_value.as_py()
if json_value is not None:
data_dictionary[name] = json.loads(json_value)
else:
data_dictionary[name] = None
with open(f"data_dictionary.json", "w") as f:
f.write(json.dumps(data_dictionary, sort_keys = True, indent=4))
# Note: It is possible to write the output to a single parquet file, using a command
# such as:
# spark.sql("SELECT * FROM output").repartition(1).write.parquet('variant.parquet')
# At the time of writing, this file does not have the logical type annotation for VARIANT