datahub/models/types.py (229 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import import abc import decimal import six from . import FieldType from .. import utils from ..exceptions import InvalidParameterException @six.add_metaclass(abc.ABCMeta) class DataType(object): """ Abstract singleton data type """ _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(DataType, cls).__new__(cls, *args, **kwargs) return cls._instance else: return object.__new__(cls) def __repr__(self): return type(self).__name__.upper() def can_implicit_cast(self, other): return isinstance(self, type(other)) def can_explicit_cast(self, other): return self.can_implicit_cast(other) def validate_value(self, val): # directly return True means without checking return True def _can_cast_or_throw(self, value, data_type): if not self.can_implicit_cast(data_type): raise InvalidParameterException('Cannot cast value(%s) from type(%s) to type(%s)' % ( value, data_type, self)) @abc.abstractmethod def do_cast(self, value, data_type): raise NotImplementedError def cast_value(self, value, data_type): self._can_cast_or_throw(value, data_type) try: return self.do_cast(value, data_type) except ValueError as e: raise InvalidParameterException(e) # Tinyint class Tinyint(DataType): _bounds = (-128, 127) def can_implicit_cast(self, other): if isinstance(other, (Smallint, Integer, Bigint, Float, Double, String, Timestamp)): return True return super(Tinyint, self).can_implicit_cast(other) def validate_value(self, val): if val is None: return True smallest, largest = self._bounds if smallest <= val <= largest: return True raise InvalidParameterException('InvalidData: Tinyint(%s) out of range' % val) def do_cast(self, value, data_type): return int(value) # Smallint class Smallint(DataType): _bounds = (-32768, 32767) def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Integer, Bigint, Float, Double, String, Timestamp)): return True return super(Smallint, self).can_implicit_cast(other) def validate_value(self, val): if val is None: return True smallest, largest = self._bounds if smallest <= val <= largest: return True raise InvalidParameterException('InvalidData: Smallint(%s) out of range' % val) def do_cast(self, value, data_type): return int(value) # Integer class Integer(DataType): _bounds = (-2147483648, 2147483647) def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Smallint, Bigint, Float, Double, String, Timestamp)): return True return super(Integer, self).can_implicit_cast(other) def validate_value(self, val): if val is None: return True smallest, largest = self._bounds if smallest <= val <= largest: return True raise InvalidParameterException('InvalidData: Integer(%s) out of range' % val) def do_cast(self, value, data_type): return int(value) # Bigint class Bigint(DataType): _bounds = (-9223372036854775808, 9223372036854775807) def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Smallint, Integer, Float, Double, String, Timestamp)): return True return super(Bigint, self).can_implicit_cast(other) def validate_value(self, val): if val is None: return True smallest, largest = self._bounds if smallest <= val <= largest: return True raise InvalidParameterException('InvalidData: Bigint(%s) out of range' % val) def do_cast(self, value, data_type): if six.PY2: return long(value) return int(value) # Float class Float(DataType): def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Smallint, Integer, Double, Bigint, String)): return True return super(Float, self).can_implicit_cast(other) def do_cast(self, value, data_type): return float(value) # Double class Double(DataType): def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Smallint, Integer, Float, Bigint, String)): return True return super(Double, self).can_implicit_cast(other) def do_cast(self, value, data_type): return float(value) # Decimal class Decimal(DataType): def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Smallint, Integer, Float, Double, Bigint, String)): return True return super(Decimal, self).can_implicit_cast(other) def do_cast(self, value, data_type): return decimal.Decimal(value) # String class String(DataType): def can_implicit_cast(self, other): if isinstance(other, (Tinyint, Smallint, Integer, Bigint, Float, Double, Timestamp)): return True return super(String, self).can_implicit_cast(other) def validate_value(self, val): return True def do_cast(self, value, data_type): return utils.to_text(value) # Timestamp class Timestamp(DataType): _ticks_bound = (-62135798400000000, 253402271999000000) def can_implicit_cast(self, other): if isinstance(other, String): return True return super(Timestamp, self).can_implicit_cast(other) def validate_value(self, val): if val is None: return True smallest, largest = self._ticks_bound if smallest <= val <= largest: return True raise InvalidParameterException('InvalidData: Timestamp(%s) out of range' % val) def do_cast(self, value, data_type): if six.PY2: return long(value) return int(value) # Boolean class Boolean(DataType): def can_implicit_cast(self, other): if isinstance(other, String): return True return super(Boolean, self).can_implicit_cast(other) def do_cast(self, value, data_type): if isinstance(data_type, String): if 'true' == value.lower(): return True elif 'false' == value.lower(): return False raise ValueError('can not cast to [%s] bool' % value) ##################################################################### # above is 10 type defined to verify field value ##################################################################### float_builtins = (float,) bool_builtins = (bool,) integer_builtins = six.integer_types if isinstance(six.integer_types, tuple) else (six.integer_types,) string_builtins = six.string_types if isinstance(six.string_types, tuple) else (six.string_types,) decimal_builtins = (decimal.Decimal,) try: import numpy as np integer_builtins += (np.integer, ) float_builtins += (np.float16, np.float32, np.float64, ) if np.__version__ < "1.20.0": float_builtins += (np.float, ) except ImportError: pass tinyint_type = Tinyint() smallint_type = Smallint() integer_type = Integer() bigint_type = Bigint() float_type = Float() double_type = Double() string_type = String() timestamp_type = Timestamp() boolean_type = Boolean() decimal_type = Decimal() _datahub_types_dict = { FieldType.TINYINT: tinyint_type, FieldType.SMALLINT: smallint_type, FieldType.INTEGER: integer_type, FieldType.BIGINT: bigint_type, FieldType.FLOAT: float_type, FieldType.DOUBLE: double_type, FieldType.STRING: string_type, FieldType.TIMESTAMP: timestamp_type, FieldType.BOOLEAN: boolean_type, FieldType.DECIMAL: decimal_type } _builtin_types_dict = { tinyint_type: integer_builtins, smallint_type: integer_builtins, integer_type: integer_builtins, bigint_type: integer_builtins, float_type: float_builtins, double_type: float_builtins, string_type: string_builtins, timestamp_type: integer_builtins, boolean_type: bool_builtins, decimal_type: decimal_builtins } def infer_builtin_type(value): for datahub_type, builtin_types in six.iteritems(_builtin_types_dict): if isinstance(value, builtin_types): return datahub_type def _validate_builtin_value(value, data_type): if value is None: return None if isinstance(value, (bytearray, six.binary_type)): value = value.decode('utf-8') builtin_types = _builtin_types_dict[data_type] if type(value) in builtin_types: return value inferred_data_type = infer_builtin_type(value) if inferred_data_type is None: raise InvalidParameterException('Unknown value type,' ' cannot infer from value: %s, type: %s' % (value, type(value))) return data_type.cast_value(value, inferred_data_type) def validate_value(value, field): if field.allow_null and field.type != FieldType.STRING and (value == '' or value == b''): return None datahub_type = _datahub_types_dict[field.type] result = _validate_builtin_value(value, datahub_type) datahub_type.validate_value(result) return result