metastore/validator.go (193 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metastore import ( "fmt" memCom "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/metastore/common" "github.com/uber/aresdb/utils" "gopkg.in/validator.v2" "reflect" ) // TableSchemaValidator validates it a new table schema is valid, given existing schema type TableSchemaValidator interface { SetOldTable(table common.Table) SetNewTable(table common.Table) Validate() error } // NewTableSchameValidator returns a new TableSchemaValidator. Pass nil for oldTable if none exists func NewTableSchameValidator() TableSchemaValidator { return &tableSchemaValidatorImpl{} } type tableSchemaValidatorImpl struct { newTable *common.Table oldTable *common.Table } func (v *tableSchemaValidatorImpl) SetOldTable(table common.Table) { v.oldTable = &table } func (v *tableSchemaValidatorImpl) SetNewTable(table common.Table) { v.newTable = &table } func (v tableSchemaValidatorImpl) Validate() (err error) { if v.oldTable == nil { return v.validateIndividualSchema(v.newTable, true) } return v.validateSchemaUpdate(v.newTable, v.oldTable) } // ValidateHLLConfig validates hll config func validateColumnHLLConfig(c common.Column) error { if c.HLLConfig.IsHLLColumn { if c.Type != common.Uint32 && c.Type != common.Int32 && c.Type != common.Int64 && c.Type != common.UUID { return fmt.Errorf("data Type %s not allowed for fast hll aggregation, valid options: [%s|%s|%s|%s]", c.Type, common.Uint32, common.Int32, common.Int64, common.UUID) } } return nil } // checks performed: // table has at least 1 valid column // table has at least 1 valid primary key column // fact table must have a time column as first column // fact table must have sort columns that are valid // each column have valid data type and default value // sort columns cannot have duplicate columnID // primary key columns cannot have duplicate columnID // column name cannot duplicate // check hll cannot be enabled on time column // check column configs func (v tableSchemaValidatorImpl) validateIndividualSchema(table *common.Table, creation bool) (err error) { var colIdDedup []bool nonDeletedColumnsCount := 0 colNameDedup := make(map[string]bool) for columnID, column := range table.Columns { if !column.Deleted { nonDeletedColumnsCount++ } if colNameDedup[column.Name] { return common.ErrDuplicatedColumnName } colNameDedup[column.Name] = true // validate data type if dataType := memCom.DataTypeFromString(column.Type); dataType == memCom.Unknown { return common.ErrInvalidDataType } else if table.IsFactTable && columnID == 0 && dataType != memCom.Uint32 { return common.ErrMissingTimeColumn } // validate hll config if err := validateColumnHLLConfig(column); err != nil { return err } // time column does not allow hll config if table.IsFactTable && columnID == 0 && column.HLLConfig.IsHLLColumn { return common.ErrTimeColumnDoesNotAllowHLLConfig } if column.DefaultValue != nil { if table.IsFactTable && columnID == 0 { return common.ErrTimeColumnDoesNotAllowDefault } if column.HLLConfig.IsHLLColumn { return common.ErrHLLColumnDoesNotAllowDefaultValue } err = ValidateDefaultValue(*column.DefaultValue, column.Type) if err != nil { return err } } } if nonDeletedColumnsCount == 0 { return common.ErrAllColumnsInvalid } if len(table.PrimaryKeyColumns) == 0 { return common.ErrMissingPrimaryKey } colIdDedup = make([]bool, len(table.Columns)) for _, colId := range table.PrimaryKeyColumns { if colId >= len(table.Columns) { return common.ErrColumnNonExist } if table.Columns[colId].Deleted { return common.ErrColumnDeleted } if colIdDedup[colId] { return common.ErrDuplicatedColumn } colDataType := memCom.DataTypeFromString(table.Columns[colId].Type) if memCom.IsArrayType(colDataType) { return common.ErrInvalidPrimaryKeyDataType } colIdDedup[colId] = true } if err := validator.Validate(table.Config); err != nil { return utils.StackError(err, "invalid table config") } if table.IsFactTable { colIdDedup = make([]bool, len(table.Columns)) for _, sortColumnId := range table.ArchivingSortColumns { if sortColumnId >= len(table.Columns) { return common.ErrColumnNonExist } if table.Columns[sortColumnId].Deleted { return common.ErrColumnDeleted } if colIdDedup[sortColumnId] { return common.ErrDuplicatedColumn } colDataType := memCom.DataTypeFromString(table.Columns[sortColumnId].Type) if colDataType >= memCom.ArrayBool && colDataType <= memCom.ArrayInt64 { return common.ErrInvalidSortColumnDataType } colIdDedup[sortColumnId] = true } } return } // checks performed // check that new table is valid table // check new table has larger version number // check no changes on immutable fields (table name, type, pk) // check updates on columns and sort columns are valid // check allowMissingEventTime cannot be changed from true to false // check hllConfig cannot be changed func (v tableSchemaValidatorImpl) validateSchemaUpdate(newTable, oldTable *common.Table) (err error) { if err := v.validateIndividualSchema(newTable, false); err != nil { return err } if newTable.Name != oldTable.Name { return common.ErrSchemaUpdateNotAllowed } if newTable.IsFactTable != oldTable.IsFactTable { return common.ErrSchemaUpdateNotAllowed } // validate columns if len(newTable.Columns) < len(oldTable.Columns) { // even with column deletion, or recreation, column id are not reused return common.ErrInsufficientColumnCount } if oldTable.IsFactTable && oldTable.Config.AllowMissingEventTime && !newTable.Config.AllowMissingEventTime { return common.ErrDisallowMissingEventTime } var i int for i = 0; i < len(oldTable.Columns); i++ { oldCol := oldTable.Columns[i] newCol := newTable.Columns[i] if oldCol.Deleted { if !newCol.Deleted { return common.ErrReusingColumnIDNotAllowed } } // check that no column configs are modified, even for deleted columns if oldCol.Name != newCol.Name || oldCol.Type != newCol.Type || !reflect.DeepEqual(oldCol.DefaultValue, newCol.DefaultValue) || oldCol.CaseInsensitive != newCol.CaseInsensitive || oldCol.DisableAutoExpand != newCol.DisableAutoExpand || oldCol.HLLConfig != newCol.HLLConfig { return common.ErrSchemaUpdateNotAllowed } } // end validate columns // primary key columns if !reflect.DeepEqual(newTable.PrimaryKeyColumns, oldTable.PrimaryKeyColumns) { return common.ErrChangePrimaryKeyColumn } // sort columns if len(newTable.ArchivingSortColumns) < len(oldTable.ArchivingSortColumns) { return common.ErrIllegalChangeSortColumn } for i, sortColumnId := range newTable.ArchivingSortColumns { if i < len(oldTable.ArchivingSortColumns) { if oldTable.ArchivingSortColumns[i] != sortColumnId { return common.ErrIllegalChangeSortColumn } } if sortColumnId >= len(newTable.Columns) { return common.ErrColumnNonExist } if newTable.Columns[sortColumnId].Deleted { return common.ErrColumnDeleted } } return } // ValidateDefaultValue validates default value against data type func ValidateDefaultValue(valueStr, dataTypeStr string) (err error) { dataType := memCom.DataTypeFromString(dataTypeStr) switch dataType { // BigEnum or Small Enum ares string values, no need to validate case memCom.BigEnum, memCom.SmallEnum: return nil default: value, err := memCom.ValueFromString(valueStr, dataType) if err != nil || !value.Valid { return utils.StackError(err, "invalid value %s for type %s", valueStr, dataTypeStr) } } return err }