odps/df/types.py (296 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys from collections import OrderedDict from datetime import datetime as _datetime, date as _date from decimal import Decimal as _Decimal from ..compat import six from ..lib.xnamedtuple import xnamedtuple from ..models import TableSchema, Column from ..config import options from ..types import DataType, Array, Map, Struct as _Struct, parse_composite_types class Primitive(DataType): __slots__ = () @property def CLASS_NAME(self): return self.__class__.__name__ def cast_value(self, value, data_type): self._can_cast_or_throw(value, data_type) return value class Integer(Primitive): __slots__ = () def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) if isinstance(other, Integer) and other._n_bytes <= self._n_bytes: return True return False def validate_value(self, val, max_field_size=None): if val is None and self.nullable: return True return self._bounds[0] <= val <= self._bounds[1] class Float(Primitive): __slots__ = () def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) if isinstance(other, (Integer, Float)): return True return False def cast_value(self, value, data_type): self._can_cast_or_throw(value, data_type) return float(value) class Int8(Integer): __slots__ = () _n_bytes = 1 _bounds = (-128, 127) class Int16(Integer): __slots__ = () _n_bytes = 2 _bounds = (-32768, 32767) class Int32(Integer): __slots__ = () _n_bytes = 4 _bounds = (-2147483648, 2147483647) class Int64(Integer): __slots__ = () _n_bytes = 8 _bounds = (-9223372036854775808, 9223372036854775807) class Float32(Float): __slots__ = () _n_bytes = 4 class Float64(Float): __slots__ = () _n_bytes = 8 class Datetime(Primitive): __slots__ = () def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) if isinstance(other, (Datetime, String, Integer)): return True return False class Date(Primitive): __slots__ = () def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) if isinstance(other, (Timestamp, Datetime, String)): return True return False class Timestamp(Primitive): __slots__ = () def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) if isinstance(other, (Timestamp, Datetime, String)): return True return False class Json(Primitive): __slots__ = () class Boolean(Primitive): __slots__ = () class Decimal(Primitive): __slots__ = () def can_implicit_cast(self, other): if isinstance(other, (Decimal, Integer)): return True return False class String(Primitive): __slots__ = () class Binary(Primitive): __slots__ = () class List(Array): __slots__ = () CLASS_NAME = "List" def __init__(self, value_type, nullable=True): DataType.__init__(self, nullable=nullable) self.value_type = validate_data_type(value_type) def _equals(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) return DataType._equals(self, other) and \ self.value_type == other.value_type def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) return isinstance(other, List) and \ self.value_type == other.value_type and \ self.nullable == other.nullable class Dict(Map): __slots__ = () CLASS_NAME = "Dict" def __init__(self, key_type, value_type, nullable=True): DataType.__init__(self, nullable=nullable) self.key_type = validate_data_type(key_type) self.value_type = validate_data_type(value_type) def _equals(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) return DataType._equals(self, other) and \ self.key_type == other.key_type and \ self.value_type == other.value_type def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) return isinstance(other, Dict) and \ self.key_type == other.key_type and \ self.value_type == other.value_type and \ self.nullable == other.nullable class Struct(_Struct): __slots__ = () CLASS_NAME = "Struct" def __init__(self, field_types, nullable=True): DataType.__init__(self, nullable=nullable) self.field_types = OrderedDict() if isinstance(field_types, dict): field_types = six.iteritems(field_types) for k, v in field_types: self.field_types[k] = validate_data_type(v) self.namedtuple_type = xnamedtuple( "StructNamedTuple", list(self.field_types.keys()) ) self._struct_as_dict = options.struct_as_dict if self._struct_as_dict: self._use_ordered_dict = options.struct_as_ordered_dict if self._use_ordered_dict is None: self._use_ordered_dict = sys.version_info[:2] <= (3, 6) else: self._use_ordered_dict = False def _equals(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) return isinstance(other, Struct) and \ len(self.field_types) == len(other.field_types) and \ all(self.field_types[k] == other.field_types[k] for k in six.iterkeys(self.field_types)) def can_implicit_cast(self, other): if isinstance(other, six.string_types): other = validate_data_type(other) return isinstance(other, Struct) and self == other and \ self.nullable == other.nullable int8 = Int8() int16 = Int16() int32 = Int32() int64 = Int64() float32 = Float32() float64 = Float64() boolean = Boolean() string = String() decimal = Decimal() datetime = Datetime() date = Date() timestamp = Timestamp() binary = Binary() json = Json() _data_types = dict( (t.name, t) for t in (int8, int16, int32, int64, float32, float64, boolean, string, decimal, datetime, binary, date, timestamp) ) _composite_handlers = dict( list=List, dict=Dict, struct=Struct, ) def validate_data_type(data_type): if isinstance(data_type, DataType): return data_type if isinstance(data_type, type): data_type = data_type.__name__ if isinstance(data_type, six.string_types): data_type = data_type.lower() if data_type == 'int': data_type = 'int64' elif data_type == 'float': data_type = 'float64' elif data_type == 'bool': data_type = 'boolean' if data_type in _data_types: return _data_types[data_type] composite_type = parse_composite_types(data_type, _composite_handlers) if composite_type: return composite_type raise ValueError('Invalid data type: %s' % repr(data_type)) def validate_value_type(value, data_type=None): try: from pandas import Timestamp except ImportError: Timestamp = None if data_type is not None: data_type.validate_value(value) return data_type inferred_value_type = None if isinstance(value, bool): inferred_value_type = boolean elif isinstance(value, six.integer_types): for t in (int8, int16, int32, int64): if t.validate_value(value): inferred_value_type = t break if inferred_value_type is None: raise ValueError('Integer value too large: %s' % value) elif isinstance(value, float): inferred_value_type = float64 elif isinstance(value, six.string_types): inferred_value_type = string elif isinstance(value, _Decimal): inferred_value_type = decimal elif isinstance(value, _datetime): inferred_value_type = datetime elif isinstance(value, _date): inferred_value_type = date elif Timestamp is not None and isinstance(value, Timestamp): inferred_value_type = timestamp else: raise ValueError('Unknown value: %s, type: %s' % (value, type(value))) return inferred_value_type _number_types = OrderedDict.fromkeys([ int8, int16, int32, int64, float32, float64, decimal]) def number_types(): return _number_types.keys() def is_number(data_type): if not isinstance(data_type, DataType): data_type = validate_data_type(data_type) if data_type in _number_types: return True return False class Unknown(DataType): __slots__ = 'type', # the type of the column and identify that it's a dynamic field _singleton = False CLASS_NAME = 'Unknown' def __init__(self, nullable=True, type=None): super(Unknown, self).__init__(nullable=nullable) self.type = type def _equals(self, other): # ``Unknown`` type is not equal to other types return False def can_implicit_cast(self, other): # ``Unknown`` can cast to other types return True class DynamicSchema(TableSchema): def __init__(self, *args, **kwargs): self.default_type = kwargs.pop('default_type', None) super(DynamicSchema, self).__init__(*args, **kwargs) def __contains__(self, item): # We do not know the actual columns, # just return True return True def __eq__(self, other): return False def __getitem__(self, item): if isinstance(item, six.string_types): try: return super(DynamicSchema, self).__getitem__(item) except ValueError: return Column(name=item, type=Unknown(type=self.default_type)) return super(DynamicSchema, self).__getitem__(item) def get_column(self, name): try: return super(DynamicSchema, self).get_column() except ValueError: return Column(name=name, type=Unknown(type=self.default_type)) @classmethod def from_schema(cls, schema, default_type=None): if isinstance(schema, DynamicSchema): if default_type == schema.default_type: return schema default_type = default_type or schema.default_type return DynamicSchema(columns=schema._columns, default_type=default_type) return DynamicSchema(columns=schema._columns, default_type=default_type)