table/requirements.go (255 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package table import ( "encoding/json" "errors" "fmt" "io" "github.com/google/uuid" ) const ( reqAssertCreate = "assert-create" reqAssertTableUUID = "assert-table-uuid" reqAssertRefSnapshotID = "assert-ref-snapshot-id" reqAssertDefaultSpecID = "assert-default-spec-id" reqAssertCurrentSchemaID = "assert-current-schema-id" reqAssertDefaultSortOrderID = "assert-default-sort-order-id" reqAssertLastAssignedFieldID = "assert-last-assigned-field-id" reqAssertLastAssignedPartitionID = "assert-last-assigned-partition-id" ) var ErrInvalidRequirement = errors.New("invalid requirement") // A Requirement is a validation rule that must be satisfied before attempting to // make and commit changes to a table. Requirements are used to ensure that the // table is in a valid state before making changes. type Requirement interface { // Validate checks that the current table metadata satisfies the requirement. Validate(Metadata) error GetType() string } // baseRequirement is a common struct that all requirements embed. It is used to // identify the type of the requirement. type baseRequirement struct { Type string `json:"type"` } func (b baseRequirement) GetType() string { return b.Type } type assertCreate struct { baseRequirement } // AssertCreate creates a requirement that the table does not already exist. func AssertCreate() Requirement { return &assertCreate{ baseRequirement: baseRequirement{Type: reqAssertCreate}, } } func (a *assertCreate) Validate(meta Metadata) error { if meta != nil { return errors.New("Table already exists") } return nil } type assertTableUuid struct { baseRequirement UUID uuid.UUID `json:"uuid"` } // AssertTableUUID creates a requirement that the table UUID matches the given UUID. func AssertTableUUID(uuid uuid.UUID) Requirement { return &assertTableUuid{ baseRequirement: baseRequirement{Type: reqAssertTableUUID}, UUID: uuid, } } func (a *assertTableUuid) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } if meta.TableUUID() != a.UUID { return fmt.Errorf("UUID mismatch: %s != %s", meta.TableUUID(), a.UUID) } return nil } type assertRefSnapshotID struct { baseRequirement Ref string `json:"ref"` SnapshotID *int64 `json:"snapshot-id"` } // AssertRefSnapshotID creates a requirement which ensures that the table branch // or tag identified by the given ref must reference the given snapshot id. // If the id is nil, the ref must not already exist. func AssertRefSnapshotID(ref string, id *int64) Requirement { return &assertRefSnapshotID{ baseRequirement: baseRequirement{Type: reqAssertRefSnapshotID}, Ref: ref, SnapshotID: id, } } func (a *assertRefSnapshotID) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } var r *SnapshotRef for name, ref := range meta.Refs() { if name == a.Ref { r = &ref break } } if r != nil { if a.SnapshotID == nil { return fmt.Errorf("requirement failed: %s %s was created concurrently", r.SnapshotRefType, a.Ref) } if r.SnapshotID != *a.SnapshotID { return fmt.Errorf("requirement failed: %s %s has changed: expected id %d, found %d", r.SnapshotRefType, a.Ref, a.SnapshotID, r.SnapshotID) } } else if a.SnapshotID != nil { return fmt.Errorf("requirement failed: branch or tag %s is missing, expected %d", a.Ref, a.SnapshotID) } return nil } type assertLastAssignedFieldId struct { baseRequirement LastAssignedFieldID int `json:"last-assigned-field-id"` } // AssertLastAssignedFieldID validates that the table's last assigned column ID // matches the given id. func AssertLastAssignedFieldID(id int) Requirement { return &assertLastAssignedFieldId{ baseRequirement: baseRequirement{Type: reqAssertLastAssignedFieldID}, LastAssignedFieldID: id, } } func (a *assertLastAssignedFieldId) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } if meta.LastColumnID() != a.LastAssignedFieldID { return fmt.Errorf("requirement failed: last assigned field id has changed: expected %d, found %d", a.LastAssignedFieldID, meta.LastColumnID()) } return nil } type assertCurrentSchemaId struct { baseRequirement CurrentSchemaID int `json:"current-schema-id"` } // AssertCurrentSchemaId creates a requirement that the table's current schema ID // matches the given id. func AssertCurrentSchemaID(id int) Requirement { return &assertCurrentSchemaId{ baseRequirement: baseRequirement{Type: reqAssertCurrentSchemaID}, CurrentSchemaID: id, } } func (a *assertCurrentSchemaId) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } if meta.CurrentSchema().ID != a.CurrentSchemaID { return fmt.Errorf("requirement failed: current schema id has changed: expected %d, found %d", a.CurrentSchemaID, meta.CurrentSchema().ID) } return nil } type assertLastAssignedPartitionId struct { baseRequirement LastAssignedPartitionID int `json:"last-assigned-partition-id"` } // AssertLastAssignedPartitionID creates a requriement that the table's last assigned partition ID // matches the given id. func AssertLastAssignedPartitionID(id int) Requirement { return &assertLastAssignedPartitionId{ baseRequirement: baseRequirement{Type: reqAssertLastAssignedPartitionID}, LastAssignedPartitionID: id, } } func (a *assertLastAssignedPartitionId) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } if *meta.LastPartitionSpecID() != a.LastAssignedPartitionID { return fmt.Errorf("requirement failed: last assigned partition id has changed: expected %d, found %d", a.LastAssignedPartitionID, *meta.LastPartitionSpecID()) } return nil } type assertDefaultSpecId struct { baseRequirement DefaultSpecID int `json:"default-spec-id"` } // AssertDefaultSpecID creates a requirement that the table's default partition spec ID // matches the given id. func AssertDefaultSpecID(id int) Requirement { return &assertDefaultSpecId{ baseRequirement: baseRequirement{Type: reqAssertDefaultSpecID}, DefaultSpecID: id, } } func (a *assertDefaultSpecId) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } if meta.DefaultPartitionSpec() != a.DefaultSpecID { return fmt.Errorf("requirement failed: default spec id has changed: expected %d, found %d", a.DefaultSpecID, meta.DefaultPartitionSpec()) } return nil } type assertDefaultSortOrderId struct { baseRequirement DefaultSortOrderID int `json:"default-sort-order-id"` } // AssertDefaultSortOrderID creates a requirement that the table's default sort order ID // matches the given id. func AssertDefaultSortOrderID(id int) Requirement { return &assertDefaultSortOrderId{ baseRequirement: baseRequirement{Type: reqAssertDefaultSortOrderID}, DefaultSortOrderID: id, } } func (a *assertDefaultSortOrderId) Validate(meta Metadata) error { if meta == nil { return errors.New("requirement failed: current table metadata does not exist") } if meta.DefaultSortOrder() != a.DefaultSortOrderID { return fmt.Errorf("requirement failed: default sort order id has changed: expected %d, found %d", a.DefaultSortOrderID, meta.DefaultSortOrder()) } return nil } // ParseRequirement parses json data provided by the reader into a Requirement func ParseRequirement(r io.Reader) (Requirement, error) { data, err := io.ReadAll(r) if err != nil { return nil, err } return ParseRequirementBytes(data) } // ParseRequirementString parses json string into a Requirement func ParseRequirementString(s string) (Requirement, error) { return ParseRequirementBytes([]byte(s)) } // ParseRequirementBytes parses json bytes into a Requirement func ParseRequirementBytes(b []byte) (Requirement, error) { var base baseRequirement if err := json.Unmarshal(b, &base); err != nil { return nil, err } switch base.Type { case reqAssertCreate: return AssertCreate(), nil case reqAssertTableUUID: var req assertTableUuid if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertTableUUID(req.UUID), nil case reqAssertRefSnapshotID: var req assertRefSnapshotID if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertRefSnapshotID(req.Ref, req.SnapshotID), nil case reqAssertDefaultSpecID: var req assertDefaultSpecId if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertDefaultSpecID(req.DefaultSpecID), nil case reqAssertCurrentSchemaID: var req assertCurrentSchemaId if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertCurrentSchemaID(req.CurrentSchemaID), nil case reqAssertDefaultSortOrderID: var req assertDefaultSortOrderId if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertDefaultSortOrderID(req.DefaultSortOrderID), nil case reqAssertLastAssignedFieldID: var req assertLastAssignedFieldId if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertLastAssignedFieldID(req.LastAssignedFieldID), nil case reqAssertLastAssignedPartitionID: var req assertLastAssignedPartitionId if err := json.Unmarshal(b, &req); err != nil { return nil, err } return AssertLastAssignedPartitionID(req.LastAssignedPartitionID), nil } return nil, ErrInvalidRequirement }