internal/avro_schemas.go (363 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package internal import ( "fmt" "github.com/hamba/avro/v2" ) func NullableSchema(schema avro.Schema) avro.Schema { return Must(avro.NewUnionSchema([]avro.Schema{ NullSchema, schema, })) } var requiredLength = [...]int{ 1, 1, 1, 2, 2, 3, 3, 4, 4, 4, 5, 5, 6, 6, 6, 7, 7, 8, 8, 9, 9, 9, 10, 10, 11, 11, 11, 12, 12, 13, 13, 13, 14, 14, 15, 15, 16, 16, 16, 17, } // DecimalRequiredBytes returns the required number of bytes to store a // decimal value of the given precision. If the precision is outside // the range (0, 40], this returns -1 as it is invalid. func DecimalRequiredBytes(precision int) int { if precision <= 0 || precision >= 40 { return -1 } return requiredLength[precision] } func DecimalSchema(precision, scale int) avro.Schema { return Must(avro.NewFixedSchema("fixed", "", DecimalRequiredBytes(precision), avro.NewDecimalLogicalSchema(precision, scale))) } var ( NullSchema = avro.NewNullSchema() BoolSchema = avro.NewPrimitiveSchema(avro.Boolean, nil) NullableBoolSchema = NullableSchema(BoolSchema) BinarySchema = avro.NewPrimitiveSchema(avro.Bytes, nil) NullableBinarySchema = NullableSchema(BinarySchema) StringSchema = avro.NewPrimitiveSchema(avro.String, nil) IntSchema = avro.NewPrimitiveSchema(avro.Int, nil) NullableIntSchema = NullableSchema(IntSchema) LongSchema = avro.NewPrimitiveSchema(avro.Long, nil) NullableLongSchema = NullableSchema(LongSchema) FloatSchema = avro.NewPrimitiveSchema(avro.Float, nil) DoubleSchema = avro.NewPrimitiveSchema(avro.Double, nil) DateSchema = avro.NewPrimitiveSchema(avro.Int, avro.NewPrimitiveLogicalSchema(avro.Date)) TimeSchema = avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimeMicros)) TimestampSchema = avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimestampMicros), avro.WithProps(map[string]any{"adjust-to-utc": false})) TimestampTzSchema = avro.NewPrimitiveSchema(avro.Long, avro.NewPrimitiveLogicalSchema(avro.TimestampMicros), avro.WithProps(map[string]any{"adjust-to-utc": true})) UUIDSchema = Must(avro.NewFixedSchema("uuid", "", 16, avro.NewPrimitiveLogicalSchema(avro.UUID))) AvroSchemaCache avro.SchemaCache ) func newMapSchema(name string, keySchema, valueSchema avro.Schema, keyFieldID, valueFieldID int) avro.Schema { return avro.NewArraySchema( Must(avro.NewRecordSchema(name, "", []*avro.Field{ Must(avro.NewField("key", keySchema, WithFieldID(keyFieldID))), Must(avro.NewField("value", valueSchema, WithFieldID(valueFieldID))), })), avro.WithProps(map[string]any{"logicalType": "map"})) } func WithFieldID(id int) avro.SchemaOption { return avro.WithProps(map[string]any{"field-id": id}) } func WithElementID(id int) avro.SchemaOption { return avro.WithProps(map[string]any{"element-id": id}) } func init() { AvroSchemaCache.Add("field_summary", Must(avro.NewRecordSchema("field_summary", "", []*avro.Field{ Must(avro.NewField("contains_null", BoolSchema, avro.WithDoc("true if the field contains null values"), WithFieldID(509))), Must(avro.NewField("contains_nan", NullableBoolSchema, avro.WithDoc("true if the field contains NaN values"), WithFieldID(518))), Must(avro.NewField("lower_bound", NullableBinarySchema, avro.WithDoc("serialized lower bound"), WithFieldID(510))), Must(avro.NewField("upper_bound", NullableBinarySchema, avro.WithDoc("serialized upper bound"), WithFieldID(511))), }, WithFieldID(508)))) AvroSchemaCache.Add("manifest_list_file_v1", Must(avro.NewRecordSchema("manifest_file", "", []*avro.Field{ Must(avro.NewField("manifest_path", StringSchema, avro.WithDoc("Location URI with FS scheme"), WithFieldID(500))), Must(avro.NewField("manifest_length", LongSchema, avro.WithDoc("Total file size in bytes"), WithFieldID(501))), Must(avro.NewField("partition_spec_id", IntSchema, avro.WithDoc("Spec ID used to write"), WithFieldID(502))), Must(avro.NewField("added_snapshot_id", LongSchema, avro.WithDoc("Snapshot ID that added the manifest"), WithFieldID(503))), Must(avro.NewField("added_files_count", NullableIntSchema, avro.WithDoc("Added entry count"), WithFieldID(504))), Must(avro.NewField("existing_files_count", NullableIntSchema, avro.WithDoc("Existing entry count"), WithFieldID(505))), Must(avro.NewField("deleted_files_count", NullableIntSchema, avro.WithDoc("Deleted entry count"), WithFieldID(506))), Must(avro.NewField("partitions", NullableSchema( avro.NewArraySchema(AvroSchemaCache.Get("field_summary"), WithElementID(508))), avro.WithDoc("Partition field summaries"), WithFieldID(507))), Must(avro.NewField("added_rows_count", NullableLongSchema, avro.WithDoc("Added row count"), WithFieldID(512))), Must(avro.NewField("existing_rows_count", NullableLongSchema, avro.WithDoc("Existing row count"), WithFieldID(513))), Must(avro.NewField("deleted_rows_count", NullableLongSchema, avro.WithDoc("Deleted row count"), WithFieldID(514))), Must(avro.NewField("key_metadata", NullableBinarySchema, avro.WithDoc("Key metadata"), WithFieldID(519))), }))) AvroSchemaCache.Add("manifest_list_file_v2", Must(avro.NewRecordSchema("manifest_file", "", []*avro.Field{ Must(avro.NewField("manifest_path", StringSchema, avro.WithDoc("Location URI with FS scheme"), WithFieldID(500))), Must(avro.NewField("manifest_length", LongSchema, avro.WithDoc("Total file size in bytes"), WithFieldID(501))), Must(avro.NewField("partition_spec_id", IntSchema, avro.WithDoc("Spec ID used to write"), WithFieldID(502))), Must(avro.NewField("content", IntSchema, avro.WithDoc("Content type"), avro.WithDefault(0), WithFieldID(517))), Must(avro.NewField("sequence_number", LongSchema, avro.WithDoc("Sequence number"), avro.WithDefault(int64(0)), WithFieldID(515))), Must(avro.NewField("min_sequence_number", LongSchema, avro.WithDoc("Minimum sequence number"), avro.WithDefault(int64(0)), WithFieldID(516))), Must(avro.NewField("added_snapshot_id", LongSchema, avro.WithDoc("Snapshot ID that added the manifest"), WithFieldID(503))), Must(avro.NewField("added_files_count", IntSchema, avro.WithDoc("Added entry count"), WithFieldID(504))), Must(avro.NewField("existing_files_count", IntSchema, avro.WithDoc("Existing entry count"), WithFieldID(505))), Must(avro.NewField("deleted_files_count", IntSchema, avro.WithDoc("Deleted entry count"), WithFieldID(506))), Must(avro.NewField("partitions", NullableSchema( avro.NewArraySchema(AvroSchemaCache.Get("field_summary"), WithElementID(508))), avro.WithDoc("Partition field summaries"), WithFieldID(507))), Must(avro.NewField("added_rows_count", LongSchema, avro.WithDoc("Added row count"), WithFieldID(512))), Must(avro.NewField("existing_rows_count", LongSchema, avro.WithDoc("Existing row count"), WithFieldID(513))), Must(avro.NewField("deleted_rows_count", LongSchema, avro.WithDoc("Deleted row count"), WithFieldID(514))), Must(avro.NewField("key_metadata", NullableBinarySchema, avro.WithDoc("Key metadata"), WithFieldID(519))), }))) AvroSchemaCache.Add("data_file_v1", Must(avro.NewRecordSchema("r2", "", []*avro.Field{ Must(avro.NewField("file_path", StringSchema, avro.WithDoc("Location URI with FS scheme"), WithFieldID(100))), Must(avro.NewField("file_format", StringSchema, avro.WithDoc("File format name: avro, orc, parquet"), WithFieldID(101))), // skip partition field, we'll add that dynamically as needed Must(avro.NewField("record_count", LongSchema, avro.WithDoc("Number of records in the file"), WithFieldID(103))), Must(avro.NewField("file_size_in_bytes", LongSchema, avro.WithDoc("Size of the file in bytes"), WithFieldID(104))), Must(avro.NewField("block_size_in_bytes", LongSchema, avro.WithDoc("Deprecated. Always write default in v1. Do not write in v2."), avro.WithDefault(int64(64*1024*1024)), WithFieldID(105))), Must(avro.NewField("column_sizes", NullableSchema(newMapSchema("k117_v118", IntSchema, LongSchema, 117, 118)), avro.WithDoc("map of column id to total size on disk"), WithFieldID(108))), Must(avro.NewField("value_counts", NullableSchema(newMapSchema("k119_v120", IntSchema, LongSchema, 119, 120)), avro.WithDoc("map of value to count"), WithFieldID(109))), Must(avro.NewField("null_value_counts", NullableSchema(newMapSchema("k121_v122", IntSchema, LongSchema, 121, 122)), avro.WithDoc("map of value to count"), WithFieldID(110))), Must(avro.NewField("nan_value_counts", NullableSchema(newMapSchema("k138_v139", IntSchema, LongSchema, 138, 139)), avro.WithDoc("map of value to count"), WithFieldID(137))), Must(avro.NewField("lower_bounds", NullableSchema(newMapSchema("k126_v127", IntSchema, BinarySchema, 126, 127)), avro.WithDoc("map of column id to lower bound"), WithFieldID(125))), Must(avro.NewField("upper_bounds", NullableSchema(newMapSchema("k129_v130", IntSchema, BinarySchema, 129, 130)), avro.WithDoc("map of column id to upper bound"), WithFieldID(128))), Must(avro.NewField("key_metadata", NullableBinarySchema, avro.WithDoc("Encryption Key Metadata Blob"), WithFieldID(131))), Must(avro.NewField("split_offsets", NullableSchema(avro.NewArraySchema(LongSchema, WithElementID(133))), avro.WithDoc("splitable offsets"), WithFieldID(132))), Must(avro.NewField("sort_order_id", NullableIntSchema, avro.WithDoc("Sort order ID"), WithFieldID(140))), }))) AvroSchemaCache.Add("data_file_v2", Must(avro.NewRecordSchema("r2", "", []*avro.Field{ Must(avro.NewField("content", IntSchema, avro.WithDoc("Content type"), avro.WithDefault(0), WithFieldID(134))), Must(avro.NewField("file_path", StringSchema, avro.WithDoc("Location URI with FS scheme"), WithFieldID(100))), Must(avro.NewField("file_format", StringSchema, avro.WithDoc("File format name: avro, orc, parquet"), WithFieldID(101))), // skip partition field, we'll add that dynamically as needed Must(avro.NewField("record_count", LongSchema, avro.WithDoc("Number of records in the file"), WithFieldID(103))), Must(avro.NewField("file_size_in_bytes", LongSchema, avro.WithDoc("Size of the file in bytes"), WithFieldID(104))), Must(avro.NewField("column_sizes", NullableSchema(newMapSchema("k117_v118", IntSchema, LongSchema, 117, 118)), avro.WithDoc("map of column id to total size on disk"), WithFieldID(108))), Must(avro.NewField("value_counts", NullableSchema(newMapSchema("k119_v120", IntSchema, LongSchema, 119, 120)), avro.WithDoc("map of value to count"), WithFieldID(109))), Must(avro.NewField("null_value_counts", NullableSchema(newMapSchema("k121_v122", IntSchema, LongSchema, 121, 122)), avro.WithDoc("map of value to count"), WithFieldID(110))), Must(avro.NewField("nan_value_counts", NullableSchema(newMapSchema("k138_v139", IntSchema, LongSchema, 138, 139)), avro.WithDoc("map of value to count"), WithFieldID(137))), Must(avro.NewField("lower_bounds", NullableSchema(newMapSchema("k126_v127", IntSchema, BinarySchema, 126, 127)), avro.WithDoc("map of column id to lower bound"), WithFieldID(125))), Must(avro.NewField("upper_bounds", NullableSchema(newMapSchema("k129_v130", IntSchema, BinarySchema, 129, 130)), avro.WithDoc("map of column id to upper bound"), WithFieldID(128))), Must(avro.NewField("key_metadata", NullableBinarySchema, avro.WithDoc("Encryption Key Metadata Blob"), WithFieldID(131))), Must(avro.NewField("split_offsets", NullableSchema(avro.NewArraySchema(LongSchema, WithElementID(133))), avro.WithDoc("splitable offsets"), WithFieldID(132))), Must(avro.NewField("equality_ids", NullableSchema(avro.NewArraySchema(LongSchema, WithElementID(136))), avro.WithDoc("field ids used to determine row equality in equality delete files"), WithFieldID(135))), Must(avro.NewField("sort_order_id", NullableIntSchema, avro.WithDoc("Sort order ID"), WithFieldID(140))), }))) AvroSchemaCache.Add("manifest_entry_v1", Must(avro.NewRecordSchema("manifest_entry", "", []*avro.Field{ Must(avro.NewField("status", IntSchema, WithFieldID(0))), Must(avro.NewField("snapshot_id", LongSchema, WithFieldID(1))), // leave data_file for dyanmic generation }))) AvroSchemaCache.Add("manifest_entry_v2", Must(avro.NewRecordSchema("manifest_entry", "", []*avro.Field{ Must(avro.NewField("status", IntSchema, WithFieldID(0))), Must(avro.NewField("snapshot_id", NullableLongSchema, WithFieldID(1))), Must(avro.NewField("sequence_number", NullableLongSchema, WithFieldID(3))), Must(avro.NewField("file_sequence_number", NullableLongSchema, WithFieldID(4))), // leave data_file for dynamic generation }))) } func newDataFileSchema(partitionType avro.Schema, version int) (avro.Schema, error) { key := fmt.Sprintf("data_file_v%d", version) schema := AvroSchemaCache.Get(key) partField, err := avro.NewField("partition", partitionType, WithFieldID(102)) if err != nil { return nil, err } return avro.NewRecordSchema("r2", "", append(schema.(*avro.RecordSchema).Fields(), partField)) } func NewManifestFileSchema(version int) (avro.Schema, error) { switch version { case 1, 2: default: return nil, fmt.Errorf("unsupported iceberg spec version: %d", version) } key := fmt.Sprintf("manifest_list_file_v%d", version) return AvroSchemaCache.Get(key), nil } func NewManifestEntrySchema(partitionType avro.Schema, version int) (avro.Schema, error) { switch version { case 1, 2: default: return nil, fmt.Errorf("unsupported iceberg spec version: %d", version) } dfschema, err := newDataFileSchema(partitionType, version) if err != nil { return nil, err } dfField, err := avro.NewField("data_file", dfschema, WithFieldID(2)) if err != nil { return nil, err } key := fmt.Sprintf("manifest_entry_v%d", version) schema := AvroSchemaCache.Get(key) return avro.NewRecordSchema("manifest_entry", "", append(schema.(*avro.RecordSchema).Fields(), dfField)) }