#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import decimal as _builtin_decimal
import json as _json
import sys
import warnings
from collections import OrderedDict
from datetime import date as _date
from datetime import datetime as _datetime
from datetime import timedelta as _timedelta

from . import compat, utils
from .compat import DECIMAL_TYPES, Monthdelta
from .compat import decimal as _decimal
from .compat import east_asian_len, six
from .config import options
from .lib.xnamedtuple import xnamedtuple

try:
    from pandas import NA as _pd_na

    pd_na_type = type(_pd_na)
except (ImportError, ValueError):
    pd_na_type = None

force_py = options.force_py
force_c = options.force_c

_date_allow_int_conversion = False


class Column(object):
    """
    Represents a column in a table schema.

    :param str name: column name
    :param str typo: column type. Can also use `type` as keyword.
    :param str comment: comment of the column, None by default
    :param bool nullable: is column nullable, True by default

    :Example:

    >>> col = Column("col1", "bigint")
    >>> print(col.name)
    col1
    >>> print(col.type)
    bigint
    """

    def __init__(
        self,
        name=None,
        typo=None,
        comment=None,
        label=None,
        nullable=True,
        generate_expression=None,
        **kw
    ):
        self.name = utils.to_str(name)
        self.type = validate_data_type(
            typo if typo is not None else kw.pop("type", None)
        )
        self.comment = comment
        if label:
            warnings.warn("label is deprecated.", DeprecationWarning)
        self.label = label
        self.nullable = nullable

        self._generate_expression = generate_expression
        self._parsed_generate_expression = None

        if kw:
            raise TypeError("Arguments not supported for Column: %s" % list(kw))

    def __repr__(self):
        not_null_str = ", not null" if not self.nullable else ""
        return "<column {0}, type {1}{2}>".format(
            utils.to_str(self.name), self.type.name.lower(), not_null_str
        )

    def __hash__(self):
        return hash(
            (
                type(self),
                self.name,
                self.type,
                self.comment,
                self.label,
                self.nullable,
                self._generate_expression,
            )
        )

    def __eq__(self, other):
        return self is other or all(
            getattr(self, attr, None) == getattr(other, attr, None)
            for attr in (
                "name",
                "type",
                "comment",
                "label",
                "nullable",
                "_generate_expression",
            )
        )

    @property
    def generate_expression(self):
        from .expressions import parse as parse_expression

        if not self._generate_expression:
            return None
        if not self._parsed_generate_expression:
            try:
                self._parsed_generate_expression = parse_expression(
                    self._generate_expression
                )
            except (SyntaxError, ValueError):
                self._parsed_generate_expression = self._generate_expression
        return self._parsed_generate_expression

    def to_sql_clause(self, with_column_comments=True):
        sio = six.StringIO()
        if self.generate_expression:
            sio.write(
                u"  %s AS `%s`" % (utils.to_text(self.generate_expression), self.name)
            )
        else:
            sio.write(
                u"  `%s` %s" % (utils.to_text(self.name), utils.to_text(self.type))
            )
            if not self.nullable and not options.sql.ignore_fields_not_null:
                sio.write(u" NOT NULL")
        if with_column_comments and self.comment:
            comment_str = utils.escape_odps_string(utils.to_text(self.comment))
            sio.write(u" COMMENT '%s'" % comment_str)
        return sio.getvalue()

    def replace(
        self,
        name=None,
        type=None,
        comment=None,
        label=None,
        nullable=None,
        generate_expression=None,
    ):
        return Column(
            name=name or self.name,
            typo=type or self.type,
            comment=comment or self.comment,
            label=label or self.label,
            nullable=nullable or self.nullable,
            generate_expression=generate_expression or self._generate_expression,
        )


class Partition(Column):
    """
    Represents a partition column in a table schema.

    :param str name: column name
    :param str typo: column type. Can also use `type` as keyword.
    :param str comment: comment of the column, None by default
    :param bool nullable: is column nullable, True by default

    :Example:

    >>> col = Partition("col1", "bigint")
    >>> print(col.name)
    col1
    >>> print(col.type)
    bigint
    """

    def __repr__(self):
        return "<partition {0}, type {1}>".format(
            utils.to_str(self.name), self.type.name.lower()
        )


class _CallableList(list):
    """Make sure keys and values properties also callable"""

    def __call__(self):
        return self


class PartitionSpec(object):
    def __init__(self, spec=None):
        self.kv = OrderedDict()

        if isinstance(spec, PartitionSpec):
            self.kv = spec.kv.copy()
        elif isinstance(spec, dict):
            self.kv = OrderedDict(spec)
        elif isinstance(spec, six.string_types):
            splits = spec.split(",")
            for sp in splits:
                kv = sp.split("=")
                if len(kv) != 2:
                    raise ValueError(
                        "Invalid partition spec: a partition spec should "
                        'look like "part1=v1,part2=v2"'
                    )

                k, v = kv[0].strip(), kv[1].strip().strip('\'"')

                if len(k) == 0 or len(v) == 0:
                    raise ValueError("Invalid partition spec")
                if k in self.kv:
                    raise ValueError(
                        "Invalid partition spec: found duplicate partition key " + k
                    )

                self.kv[k] = v
        elif spec is not None:
            raise TypeError("Cannot accept spec %r" % spec)

    def __setitem__(self, key, value):
        self.kv[key] = value

    def __getitem__(self, key):
        return self.kv[key]

    def __len__(self):
        return len(self.kv)

    @property
    def is_empty(self):
        return len(self) == 0

    @property
    def keys(self):
        return _CallableList(self.kv.keys())

    @property
    def values(self):
        return _CallableList(self.kv.values())

    def items(self):
        for k, v in self.kv.items():
            yield k, v

    def __contains__(self, key):
        return key in self.kv

    def __str__(self):
        return ",".join("%s='%s'" % (k, v) for k, v in six.iteritems(self.kv))

    def __repr__(self):
        return "<PartitionSpec %s>" % str(self)

    def __hash__(self):
        return hash(str(self))

    def __eq__(self, other):
        if not isinstance(other, PartitionSpec):
            other = PartitionSpec(other)

        return str(self) == str(other)


class Schema(object):
    def __init__(self, names, types):
        self._init(names, types)

    def _init(self, names, types):
        if not isinstance(names, list):
            names = list(names)
        self.names = names
        self.types = [validate_data_type(t) for t in types]

        lower_names = [utils.to_lower_str(n) for n in self.names]
        self._name_indexes = {n: i for i, n in enumerate(lower_names)}

        if len(self._name_indexes) < len(self.names):
            duplicates = [n for n in self._name_indexes if lower_names.count(n) > 1]
            raise ValueError("Duplicate column names: %s" % ", ".join(duplicates))

        self._snapshot = None

    def __repr__(self):
        return self._repr()

    def __len__(self):
        return len(self.names)

    def __contains__(self, name):
        return utils.to_lower_str(name) in self._name_indexes

    def _repr(self):
        buf = six.StringIO()
        names = [self._to_printable(n) for n in self.names]
        space = 2 * max(len(it) for it in names)
        for name, tp in zip(names, self.types):
            buf.write("\n{0}{1}".format(name.ljust(space), repr(tp)))

        return "Schema {{{0}\n}}".format(utils.indent(buf.getvalue(), 2))

    def __hash__(self):
        return hash((type(self), tuple(self.names), tuple(self.types)))

    def __eq__(self, other):
        if not isinstance(other, Schema):
            return False
        return self.names == other.names and self.types == self.types

    def get_type(self, name):
        return self.types[self._name_indexes[utils.to_lower_str(name)]]

    def append(self, name, typo):
        names = self.names + [name]
        types = self.types + [validate_data_type(typo)]
        return Schema(names, types)

    def extend(self, schema):
        names = self.names + schema.names
        types = self.types + schema.types
        return Schema(names, types)


class OdpsSchema(Schema):
    def __init__(self, columns=None, partitions=None):
        self._columns = columns
        self._partitions = partitions

        if self._columns:
            super(OdpsSchema, self).__init__(
                *compat.lzip(*[(c.name, c.type) for c in self._columns])
            )
        else:
            super(OdpsSchema, self).__init__([], [])

        if self._partitions:
            self._partition_schema = Schema(
                *compat.lzip(*[(c.name, c.type) for c in self._partitions])
            )
        else:
            self._partition_schema = Schema([], [])

    def __len__(self):
        return super(OdpsSchema, self).__len__() + len(self._partition_schema)

    def __setattr__(self, key, value):
        if (
            key == "_columns"
            and value
            and not getattr(self, "names", None)
            and not getattr(self, "types", None)
        ):
            names = [c.name for c in value]
            types = [c.type for c in value]
            self._init(names, types)
        elif key == "_partitions" and value:
            self._partition_schema = Schema(
                *compat.lzip(*[(c.name, c.type) for c in value])
            )
        object.__setattr__(self, key, value)

    def __contains__(self, name):
        return (
            super(OdpsSchema, self).__contains__(name)
            or utils.to_str(name) in self._partition_schema
        )

    def __eq__(self, other):
        if not isinstance(other, OdpsSchema):
            return False

        return (
            super(OdpsSchema, self).__eq__(other)
            and self._partition_schema == other._partition_schema
        )

    def __hash__(self):
        return hash(
            (type(self), tuple(self.names), tuple(self.types), self._partition_schema)
        )

    def __getitem__(self, item):
        if isinstance(item, six.integer_types):
            n_columns = len(self._name_indexes)
            if item < n_columns:
                return self._columns[item]
            elif item < len(self):
                return self._partitions[item - n_columns]
            else:
                raise IndexError("Index out of range")
        elif isinstance(item, six.string_types):
            lower_item = utils.to_lower_str(item)
            if lower_item in self._name_indexes:
                idx = self._name_indexes[lower_item]
                return self[idx]
            elif item in self._partition_schema:
                idx = self._partition_schema._name_indexes[lower_item]
                n_columns = len(self._name_indexes)
                return self[n_columns + idx]
            else:
                raise ValueError("Unknown column name: %s" % item)
        elif isinstance(item, (list, tuple)):
            return [self[it] for it in item]
        else:
            return self.columns[item]

    def _repr(self, strip=True):
        def _strip(line):
            return line.rstrip() if strip else line

        buf = six.StringIO()

        name_dict = dict(
            [(col.name, utils.str_to_printable(col.name)) for col in self.columns]
        )
        name_display_lens = dict(
            [
                (k, east_asian_len(utils.to_text(v), encoding=options.display.encoding))
                for k, v in six.iteritems(name_dict)
            ]
        )
        max_name_len = max(six.itervalues(name_display_lens))
        name_space = max_name_len + min(16, max_name_len)
        max_type_len = max(len(repr(col.type)) for col in self.columns)
        type_space = max_type_len + min(16, max_type_len)
        has_not_null = any(not col.nullable for col in self.columns)

        not_empty = lambda field: field is not None and len(field.strip()) > 0

        buf.write("odps.Schema {\n")
        cols_strs = []
        for col in self._columns:
            pad_spaces = name_space - name_display_lens[col.name]
            not_null = "not null" if not col.nullable else " " * 8
            row = "{0}{1}{2}{3}".format(
                utils.to_str(name_dict[col.name] + " " * pad_spaces),
                repr(col.type).ljust(type_space),
                not_null + " " * 4 if has_not_null else "",
                "# {0}".format(utils.to_str(col.comment))
                if not_empty(col.comment)
                else "",
            )
            cols_strs.append(_strip(row))
        buf.write(utils.indent("\n".join(cols_strs), 2))
        buf.write("\n")
        buf.write("}\n")

        if self._partitions:
            buf.write("Partitions {\n")

            partition_strs = []
            for partition in self._partitions:
                row = "{0}{1}{2}".format(
                    utils.to_str(name_dict[partition.name].ljust(name_space)),
                    repr(partition.type).ljust(type_space),
                    "# {0}".format(utils.to_str(partition.comment))
                    if not_empty(partition.comment)
                    else "",
                )
                partition_strs.append(_strip(row))
            buf.write(utils.indent("\n".join(partition_strs), 2))
            buf.write("\n")
            buf.write("}\n")

        return buf.getvalue()

    def build_snapshot(self):
        if not options.force_py:
            if not self._columns:
                return None

            try:
                from .src.types_c import SchemaSnapshot

                self._snapshot = SchemaSnapshot(self)
            except ImportError:
                pass
        return self._snapshot

    @property
    def simple_columns(self):
        """
        List of columns as a list of :class:`~odps.types.Column`.
        Partition columns are excluded.
        """
        return self._columns

    @property
    def columns(self):
        """List of columns and partition columns as a list of :class:`~odps.types.Column`."""
        partitions = self._partitions or []
        return self._columns + partitions

    @property
    def partitions(self):
        """List of partition columns as a list of :class:`~odps.types.Partition`."""
        try:
            return self._partitions
        except AttributeError:
            return []

    @utils.deprecated("use simple_columns property instead")
    def get_columns(self):
        return self._columns

    @utils.deprecated("use partitions property instead")
    def get_partitions(self):
        return self._partitions

    def get_column(self, name):
        index = self._name_indexes.get(utils.to_lower_str(name))
        if index is None:
            raise ValueError("Column %s does not exists" % name)
        return self._columns[index]

    def get_partition(self, name):
        index = self._partition_schema._name_indexes.get(utils.to_lower_str(name))
        if index is None:
            raise ValueError("Partition %s does not exists" % name)
        return self._partitions[index]

    def is_partition(self, name):
        try:
            name = name.name
        except AttributeError:
            pass
        return utils.to_lower_str(name) in self._partition_schema._name_indexes

    def get_type(self, name):
        lower_name = utils.to_lower_str(name)
        if lower_name in self._name_indexes:
            return super(OdpsSchema, self).get_type(name)
        elif lower_name in self._partition_schema:
            return self._partition_schema.get_type(name)
        raise ValueError("Column does not exist: %s" % name)

    def update(self, columns, partitions):
        self._columns = columns
        self._partitions = partitions

        names = map(lambda c: c.name, self._columns)
        types = map(lambda c: c.type, self._columns)

        self._init(names, types)
        if self._partitions:
            self._partition_schema = Schema(
                *compat.lzip(*[(c.name, c.type) for c in self._partitions])
            )
        else:
            self._partition_schema = Schema([], [])

    def extend(self, schema):
        if isinstance(schema, Schema):
            ext_cols = [Column(n, tp) for n, tp in zip(schema.names, schema.types)]
            ext_parts = []
        else:
            ext_cols = schema.simple_columns
            ext_parts = schema.partitions
        return type(self)(
            columns=self.simple_columns + ext_cols,
            partitions=self.partitions + ext_parts,
        )

    def to_ignorecase_schema(self):
        cols = [
            Column(col.name.lower(), col.type, col.comment, col.label)
            for col in self._columns
        ]
        parts = None
        if self._partitions:
            parts = [
                Partition(part.name.lower(), part.type, part.comment, part.label)
                for part in self._partitions
            ]

        return type(self)(columns=cols, partitions=parts)

    @classmethod
    def from_lists(cls, names, types, partition_names=None, partition_types=None):
        """
        Create a schema from lists of column names and types.

        :param names: List of column names.
        :param types: List of column types.
        :param partition_names: List of partition names.
        :param partition_types: List of partition types.

        :Example:

        >>> schema = TableSchema.from_lists(['id', 'name'], ['bigint', 'string'])
        >>> print(schema.columns)
        [<column id, type bigint>, <column name, type string>]
        """
        columns = [Column(name=name, typo=typo) for name, typo in zip(names, types)]
        if partition_names is not None and partition_types is not None:
            partitions = [
                Partition(name=name, typo=typo)
                for name, typo in zip(partition_names, partition_types)
            ]
        else:
            partitions = None
        return cls(columns=columns, partitions=partitions)

    @classmethod
    def from_dict(cls, fields_dict, partitions_dict=None):
        fields = compat.lkeys(fields_dict)
        fields_types = compat.lvalues(fields_dict)
        partitions = (
            compat.lkeys(partitions_dict) if partitions_dict is not None else None
        )
        partitions_types = (
            compat.lvalues(partitions_dict) if partitions_dict is not None else None
        )

        return cls.from_lists(
            fields,
            fields_types,
            partition_names=partitions,
            partition_types=partitions_types,
        )

    def get_table_ddl(self, table_name="table_name", with_comments=True):
        from .models.table import Table

        return Table.gen_create_table_sql(
            table_name, self, with_column_comments=with_comments
        )


class RecordMeta(type):
    record_types = set()

    def __new__(mcs, name, bases, dct):
        inst = super(RecordMeta, mcs).__new__(mcs, name, bases, dct)
        mcs.record_types.add(inst)
        return inst

    def __instancecheck__(cls, instance):
        return isinstance(instance, RecordReprMixin)


def is_record(obj):
    return type(obj) in RecordMeta.record_types


class BaseRecord(object):
    # set __slots__ to save memory in the situation that records' size may be quite large
    __slots__ = "_values", "_columns", "_name_indexes", "_max_field_size"

    def __init__(self, columns=None, schema=None, values=None, max_field_size=None):
        if isinstance(columns, Schema):
            schema, columns = columns, None
        if columns is not None:
            self._columns = columns
            self._name_indexes = {
                col.name.lower(): i for i, col in enumerate(self._columns)
            }
        else:
            self._columns = schema.columns
            self._name_indexes = schema._name_indexes

        self._max_field_size = max_field_size

        if self._columns is None:
            raise ValueError("Either columns or schema should not be provided")

        self._values = [None] * len(self._columns)
        if values is not None:
            self._sets(values)

    def _mode(self):
        return "py"

    def _exclude_partition_columns(self):
        return [col for col in self._columns if not isinstance(col, Partition)]

    def _get(self, i):
        return self._values[i]

    def _set(self, i, value):
        data_type = self._columns[i].type
        val = validate_value(value, data_type, max_field_size=self._max_field_size)
        self._values[i] = val

    get = _get  # to keep compatible
    set = _set  # to keep compatible

    def _sets(self, values):
        if len(values) != len(self._columns) and len(values) != len(
            self._exclude_partition_columns()
        ):
            raise ValueError(
                "The values set to records are against the schema, "
                "expect len %s, got len %s" % (len(self._columns), len(values))
            )
        [self._set(i, value) for i, value in enumerate(values)]

    def __getitem__(self, item):
        if isinstance(item, six.string_types):
            return self.get_by_name(item)
        elif isinstance(item, (list, tuple)):
            return [self[it] for it in item]
        return self._values[item]

    def __setitem__(self, key, value):
        if isinstance(key, six.string_types):
            self.set_by_name(key, value)
        else:
            self._set(key, value)

    def __getattr__(self, item):
        if item == "_name_indexes":
            return object.__getattribute__(self, item)
        if hasattr(self, "_name_indexes") and item in self._name_indexes:
            return self.get_by_name(item)
        return object.__getattribute__(self, item)

    def __setattr__(self, key, value):
        if hasattr(self, "_name_indexes") and key in self._name_indexes:
            self.set_by_name(key, value)
        else:
            object.__setattr__(self, key, value)

    def get_by_name(self, name):
        i = self._name_indexes[utils.to_lower_str(name)]
        return self._values[i]

    def set_by_name(self, name, value):
        i = self._name_indexes[utils.to_lower_str(name)]
        self._set(i, value)

    def __len__(self):
        return len(self._columns)

    def __contains__(self, item):
        return utils.to_lower_str(item) in self._name_indexes

    def __iter__(self):
        for i, col in enumerate(self._columns):
            yield (col.name, self[i])

    @property
    def values(self):
        return self._values

    @property
    def n_columns(self):
        return len(self._columns)

    def get_columns_count(self):  # compatible
        return self.n_columns


class RecordReprMixin(object):
    def __repr__(self):
        buf = six.StringIO()

        buf.write("odps.Record {\n")

        space = 2 * max(len(it.name) for it in self._columns)
        content = "\n".join(
            [
                "{0}{1}".format(col.name.ljust(space), repr(value))
                for col, value in zip(self._columns, self._values)
            ]
        )
        buf.write(utils.indent(content, 2))

        buf.write("\n}")

        return buf.getvalue()

    def __hash__(self):
        return hash((type(self), tuple(self._columns), tuple(self._values)))

    def __eq__(self, other):
        if not is_record(other):
            return False

        return self._columns == other._columns and self._values == other._values


class Record(six.with_metaclass(RecordMeta, RecordReprMixin, BaseRecord)):
    """
    A record generally means the data of a single line in a table. It can be
    created from a schema, or by :meth:`odps.models.Table.new_record` or by
    :meth:`odps.tunnel.TableUploadSession.new_record`.

    Hints on getting or setting different types of data can be
    seen :ref:`here <record-type>`.

    :Example:

    >>> schema = TableSchema.from_lists(['name', 'id'], ['string', 'string'])
    >>> record = Record(schema=schema, values=['test', 'test2'])
    >>> record[0] = 'test'
    >>> record[0]
    >>> 'test'
    >>> record['name']
    >>> 'test'
    >>> record[0:2]
    >>> ('test', 'test2')
    >>> record[0, 1]
    >>> ('test', 'test2')
    >>> record['name', 'id']
    >>> for field in record:
    >>>     print(field)
    ('name', u'test')
    ('id', u'test2')
    >>> len(record)
    2
    >>> 'name' in record
    True
    """


class DataType(object):
    """
    Base class of all data types in MaxCompute.
    """

    _singleton = True
    _type_id = -1
    __slots__ = ("nullable",)

    def __new__(cls, *args, **kwargs):
        if cls._singleton:
            if not hasattr(cls, "_instance"):
                cls._instance = object.__new__(cls)
                cls._hash = hash(cls)
            return cls._instance
        else:
            return object.__new__(cls)

    def __init__(self, nullable=True):
        self.nullable = nullable

    def __call__(self, nullable=True):
        return self._factory(nullable=nullable)

    def _factory(self, nullable=True):
        return type(self)(nullable=nullable)

    def __ne__(self, other):
        return not (self == other)

    def __eq__(self, other):
        try:
            return self._equals(other)
        except (TypeError, ValueError):
            return False

    def _equals(self, other):
        if self is other:
            return True

        other = validate_data_type(other)

        if self.nullable != other.nullable:
            return False
        if type(self) == type(other):
            return True
        return isinstance(other, type(self))

    def __hash__(self):
        return self._hash

    @property
    def name(self):
        return type(self).__name__.lower()

    def __repr__(self):
        if self.nullable:
            return self.name
        return "{0}[non-nullable]".format(self.name)

    def __str__(self):
        return self.name.upper()

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return isinstance(self, type(other))

    def can_explicit_cast(self, other):
        return self.can_implicit_cast(other)

    def validate_value(self, val, max_field_size=None):
        # directly return True means without checking
        return True

    def _can_cast_or_throw(self, value, data_type):
        if not self.can_implicit_cast(data_type):
            raise ValueError(
                "Cannot cast value(%s) from type(%s) to type(%s)"
                % (value, data_type, self)
            )

    def cast_value(self, value, data_type):
        raise NotImplementedError


class OdpsPrimitive(DataType):
    __slots__ = ()


class BaseInteger(OdpsPrimitive):
    __slots__ = ()

    _type_id = 0
    _bounds = None
    _store_bytes = None

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseFloat, String, Decimal)):
            return True
        if isinstance(other, BaseInteger):
            return self._store_bytes >= other._store_bytes
        return super(BaseInteger, self).can_implicit_cast(other)

    def validate_value(self, val, max_field_size=None):
        if val is None and self.nullable:
            return True
        smallest, largest = self._bounds
        if smallest <= val <= largest:
            return True
        raise ValueError("InvalidData: Bigint(%s) out of range" % val)

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        return int(value)


_primitive_doc_template = """
Represents {cls_name} type in MaxCompute.

:Note:
This class may not be used directly. Use its singleton instance (``odps.types.{cls_attr}``) instead.
{odps2_note}
"""
_primitive_odps2_note = """
Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality.
"""


def _primitive_doc(cls=None, is_odps2=True):
    def wrapper(cls_internal):
        cls_name = cls_attr = cls_internal().name
        if cls_name in ("int", "float"):
            cls_attr += "_"
        odps2_note = _primitive_odps2_note if is_odps2 else ""
        docstr = _primitive_doc_template.format(
            cls_name=cls_name, cls_attr=cls_attr, odps2_note=odps2_note
        )
        try:
            cls_internal.__doc__ = docstr
        except AttributeError:
            pass
        return cls_internal

    if cls is None:
        return wrapper
    return wrapper(cls)


@_primitive_doc
class Tinyint(BaseInteger):
    _bounds = (-128, 127)
    _store_bytes = 1


@_primitive_doc
class Smallint(BaseInteger):
    _bounds = (-32768, 32767)
    _store_bytes = 2


@_primitive_doc
class Int(BaseInteger):
    _bounds = (-2147483648, 2147483647)
    _store_bytes = 4


@_primitive_doc(is_odps2=False)
class Bigint(BaseInteger):
    _bounds = (-9223372036854775808, 9223372036854775807)
    _store_bytes = 8


class BaseFloat(OdpsPrimitive):
    __slots__ = ()
    _store_bytes = None

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseInteger, String, Decimal)):
            return True
        if isinstance(other, BaseFloat):
            return self._store_bytes >= other._store_bytes
        return super(BaseFloat, self).can_implicit_cast(other)

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        return float(value)


@_primitive_doc
class Float(BaseFloat):
    _store_bytes = 4
    _type_id = 6


@_primitive_doc(is_odps2=False)
class Double(BaseFloat):
    _store_bytes = 8
    _type_id = 1


def _check_string_byte_size(val, max_size):
    if isinstance(val, six.binary_type):
        byt_len = len(val)
    else:
        byt_len = 4 * len(val)
        if byt_len > max_size:
            # encode only when necessary
            byt_len = len(utils.to_binary(val))
    return byt_len <= max_size, byt_len


@_primitive_doc(is_odps2=False)
class String(OdpsPrimitive):
    __slots__ = ()

    _type_id = 2
    _max_length = 8 * 1024 * 1024  # 8M

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseInteger, BaseFloat, Datetime, Decimal, Binary, Json)):
            return True
        return super(String, self).can_implicit_cast(other)

    def validate_value(self, val, max_field_size=None):
        if val is None and self.nullable:
            return True
        max_field_size = max_field_size or self._max_length
        valid, byt_len = _check_string_byte_size(val, max_field_size)
        if valid:
            return True
        raise ValueError(
            "InvalidData: Byte length of string(%s) is more than %sM.'"
            % (byt_len, max_field_size / (1024**2))
        )

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if isinstance(data_type, Datetime):
            return value.strftime("%Y-%m-%d %H:%M:%S")
        if options.tunnel.string_as_binary:
            val = utils.to_binary(value)
        else:
            val = utils.to_text(value)
        return val


@_primitive_doc(is_odps2=False)
class Datetime(OdpsPrimitive):
    __slots__ = ()
    _type_id = 3

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        from_types = (BaseTimestamp, Datetime, Date, String)
        if _date_allow_int_conversion:
            from_types += (Bigint,)
        if isinstance(other, from_types):
            return True
        return super(Datetime, self).can_implicit_cast(other)

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if isinstance(data_type, String):
            return _datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
        elif isinstance(data_type, Date):
            return _datetime(value.year, value.month, value.day)
        elif isinstance(data_type, BaseTimestamp):
            return value.to_pydatetime()
        elif _date_allow_int_conversion and isinstance(data_type, Bigint):
            return utils.to_datetime(value)
        return value


@_primitive_doc
class Date(OdpsPrimitive):
    __slots__ = ()
    _type_id = 11

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        from_types = (BaseTimestamp, Datetime, String)
        if _date_allow_int_conversion:
            from_types += (Bigint,)
        if isinstance(other, from_types):
            return True
        return super(Date, self).can_implicit_cast(other)

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if isinstance(data_type, String):
            return _datetime.strptime(value, "%Y-%m-%d").date()
        elif isinstance(data_type, Datetime):
            return value.date()
        elif isinstance(data_type, BaseTimestamp):
            return value.to_pydatetime().date()
        elif _date_allow_int_conversion and isinstance(data_type, Bigint):
            return utils.to_date(value)
        return value


@_primitive_doc(is_odps2=False)
class Boolean(OdpsPrimitive):
    __slots__ = ()
    _type_id = 4

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)
        return value


@_primitive_doc
class Binary(OdpsPrimitive):
    __slots__ = ()
    _type_id = 7
    _max_length = 8 * 1024 * 1024  # 8M

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseInteger, BaseFloat, Datetime, Decimal, String)):
            return True
        return super(Binary, self).can_implicit_cast(other)

    def validate_value(self, val, max_field_size=None):
        if val is None and self.nullable:
            return True
        max_field_size = max_field_size or self._max_length
        valid, byt_len = _check_string_byte_size(val, max_field_size)
        if valid:
            return True
        raise ValueError(
            "InvalidData: Byte length of binary(%s) is more than %sM.'"
            % (byt_len, max_field_size / (1024**2))
        )

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if isinstance(data_type, Datetime):
            return value.strftime("%Y-%m-%d %H:%M:%S")
        return utils.to_binary(value)


class BaseTimestamp(OdpsPrimitive):
    __slots__ = ()
    _type_id = 8

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseTimestamp, Datetime, String)):
            return True
        return super(BaseTimestamp, self).can_implicit_cast(other)

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)
        try:
            import pandas as pd
        except (ImportError, ValueError):
            raise ImportError("To use TIMESTAMP in pyodps, you need to install pandas.")

        if isinstance(data_type, String):
            return pd.to_datetime(value)
        elif isinstance(data_type, Datetime):
            return pd.Timestamp(value)
        return value


@_primitive_doc
class Timestamp(BaseTimestamp):
    _type_id = 8


@_primitive_doc
class TimestampNTZ(BaseTimestamp):
    _type_id = 13

    @property
    def name(self):
        return "timestamp_ntz"


@_primitive_doc
class IntervalDayTime(OdpsPrimitive):
    __slots__ = ()
    _type_id = 9

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseTimestamp, Datetime, String)):
            return True
        return super(IntervalDayTime, self).can_implicit_cast(other)

    @property
    def name(self):
        return "interval_day_time"

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)
        try:
            import pandas as pd
        except (ImportError, ValueError):
            raise ImportError(
                "To use INTERVAL_DAY_TIME in pyodps, you need to install pandas."
            )

        if isinstance(value, float):
            return pd.Timedelta(seconds=value)
        elif isinstance(value, _timedelta):
            return pd.Timedelta(value)
        return value


@_primitive_doc
class IntervalYearMonth(OdpsPrimitive):
    __slots__ = ()
    _type_id = 10

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (String, BaseInteger)):
            return True
        return super(IntervalYearMonth, self).can_implicit_cast(other)

    @property
    def name(self):
        return "interval_year_month"

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if isinstance(value, (int, compat.long_type, six.string_types)):
            return Monthdelta(value)
        return value


class CompositeDataType(DataType):
    _singleton = False

    @classmethod
    def parse_composite(cls, args):
        raise NotImplementedError

    def cast_composite_values(self, value):
        raise NotImplementedError


class SizeLimitedString(String, CompositeDataType):
    _singleton = False
    __slots__ = "nullable", "size_limit", "_hash"
    _max_length = 65535

    def __init__(self, size_limit, nullable=True):
        super(SizeLimitedString, self).__init__(nullable=nullable)
        if size_limit > self._max_length:
            raise ValueError(
                "InvalidData: Length of varchar(%d) is larger than %d."
                % (size_limit, self._max_length)
            )
        self.size_limit = size_limit

    @property
    def name(self):
        return "{0}({1})".format(type(self).__name__.lower(), self.size_limit)

    def _equals(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return DataType._equals(self, other) and self.size_limit == other.size_limit

    def __hash__(self):
        if not hasattr(self, "_hash"):
            self._hash = hash((type(self), self.nullable, self.size_limit))
        return self._hash

    def validate_value(self, val, max_field_size=None):
        if val is None and self.nullable:
            return True
        if len(val) <= self.size_limit:
            # binary size >= unicode size
            return True
        elif isinstance(val, six.binary_type):
            val = val.decode("utf-8")
            if len(val) <= self.size_limit:
                return True
        raise ValueError(
            "InvalidData: Length of string(%d) is more than %s.'"
            % (len(val), self.size_limit)
        )

    @classmethod
    def parse_composite(cls, args):
        if len(args) != 1:
            raise ValueError(
                "%s() only accept one length argument." % cls.__name__.upper()
            )
        try:
            return cls(int(args[0]))
        except TypeError:
            raise ValueError(
                "%s() only accept an integer length argument." % cls.__name__.upper()
            )

    def cast_composite_values(self, value):
        self.validate_value(value)
        return self.cast_value(value, self)


class Varchar(SizeLimitedString):
    """
    Represents varchar type with size limit in MaxCompute.

    :param int size_limit: The size limit of varchar type.

    :Example:

    >>> varchar_type = Varchar(65535)
    >>> print(varchar_type)
    varchar(65535)
    >>> print(varchar_type.size_limit)
    65535

    :Note:

    Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality.
    """

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(
            other, (BaseInteger, BaseFloat, Datetime, Decimal, String, Binary)
        ):
            return True
        return (
            isinstance(other, (Char, Varchar))
            and self.size_limit >= other.size_limit
            and self.nullable == other.nullable
        )


class Char(SizeLimitedString):
    """
    Represents char type with size limit in MaxCompute.

    :param int size_limit: The size limit of char type.

    :Example:

    >>> char_type = Char(65535)
    >>> print(char_type)
    char(65535)
    >>> print(char_type.size_limit)
    65535

    :Note:

    Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality.
    """

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(
            other, (BaseInteger, BaseFloat, Datetime, Decimal, String, Binary)
        ):
            return True
        return (
            isinstance(other, (Char, Varchar))
            and self.size_limit >= other.size_limit
            and self.nullable == other.nullable
        )


class Decimal(CompositeDataType):
    """
    Represents decimal type with size limit in MaxCompute.

    :param int precision: The precision (or total digits) of decimal type.
    :param int scale: The decimal scale (or decimal digits) of decimal type.

    :Example:

    >>> decimal_type = Decimal(18, 6)
    >>> print(decimal_type)
    decimal(18, 6)
    >>> print(decimal_type.precision, decimal_type.scale)
    18 6

    :Note:

    Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality
    when you are setting precision or scale.
    """

    __slots__ = "nullable", "precision", "scale", "_hash"
    _type_id = 5

    _has_other_decimal_type = len(DECIMAL_TYPES) > 1

    _default_precision = 54
    _default_scale = 18
    _decimal_ctx = _decimal.Context(prec=_default_precision)

    def __init__(self, precision=None, scale=None, nullable=True):
        super(Decimal, self).__init__(nullable=nullable)
        if precision is None and scale is not None:
            raise ValueError(
                "InvalidData: Scale should be provided along with precision."
            )
        if precision is not None and precision < 1:
            raise ValueError("InvalidData: Decimal precision < 1")
        if precision is not None and scale is not None and scale > precision:
            raise ValueError(
                "InvalidData: Decimal precision must be larger than or equal to scale"
            )
        self.precision = precision
        self.scale = scale
        self._scale_decimal = _decimal.Decimal(
            "1e%d" % -(scale if scale is not None else self._default_scale)
        )

    @property
    def name(self):
        if self.precision is None:
            return type(self).__name__.lower()
        elif self.scale is None:
            return "{0}({1})".format(type(self).__name__.lower(), self.precision)
        else:
            return "{0}({1},{2})".format(
                type(self).__name__.lower(), self.precision, self.scale
            )

    def __hash__(self):
        if not hasattr(self, "_hash"):
            self._hash = hash((type(self), self.nullable, self.precision, self.scale))
        return self._hash

    def _equals(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return (
            DataType._equals(self, other)
            and self.precision == other.precision
            and self.scale == other.scale
        )

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (BaseInteger, BaseFloat, String)):
            return True
        return super(Decimal, self).can_implicit_cast(other)

    def validate_value(self, val, max_field_size=None):
        if val is None and self.nullable:
            return True

        if (
            self._has_other_decimal_type
            and not isinstance(val, _decimal.Decimal)
            and isinstance(val, DECIMAL_TYPES)
        ):
            val = _decimal.Decimal(str(val))

        precision = (
            self.precision if self.precision is not None else self._default_precision
        )
        scale = self.scale if self.scale is not None else self._default_scale
        scaled_val = val.quantize(
            self._scale_decimal, _decimal.ROUND_HALF_UP, self._decimal_ctx
        )
        int_len = len(str(scaled_val)) - scale - 1
        if int_len > precision:
            raise ValueError(
                "decimal value %s overflow, max integer digit number is %s."
                % (val, precision)
            )
        return True

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if (
            self._has_other_decimal_type
            and not isinstance(value, _decimal.Decimal)
            and isinstance(value, DECIMAL_TYPES)
        ):
            value = _decimal.Decimal(str(value))
        if six.PY3 and isinstance(value, six.binary_type):
            value = value.decode("utf-8")
        return _builtin_decimal.Decimal(value)

    @classmethod
    def parse_composite(cls, args):
        if len(args) > 2:
            raise ValueError(
                "%s() accepts no more than two arguments." % cls.__name__.upper()
            )
        try:
            return cls(*[int(v) for v in args])
        except TypeError:
            raise ValueError(
                "%s() only accept integers as arguments." % cls.__name__.upper()
            )

    def cast_composite_values(self, value):
        if value is None and self.nullable:
            return value
        if type(value) is not _builtin_decimal.Decimal and not isinstance(
            value, _builtin_decimal.Decimal
        ):
            value = self.cast_value(value, infer_primitive_data_type(value))
        return value


class Array(CompositeDataType):
    """
    Represents array type in MaxCompute.

    :param value_type: type of elements in the array

    :Example:

    >>> from odps import types as odps_types
    >>>
    >>> array_type = odps_types.Array(odps_types.bigint)
    >>> print(array_type)
    array<bigint>
    >>> print(array_type.value_type)
    bigint

    :Note:

    Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality.
    """

    __slots__ = "nullable", "value_type", "_hash"
    _type_id = 101

    def __init__(self, value_type, nullable=True):
        super(Array, self).__init__(nullable=nullable)
        value_type = validate_data_type(value_type)
        self.value_type = value_type

    @property
    def name(self):
        return "{0}<{1}>".format(type(self).__name__.lower(), self.value_type.name)

    def _equals(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return DataType._equals(self, other) and self.value_type == other.value_type

    def __hash__(self):
        if not hasattr(self, "_hash"):
            self._hash = hash((type(self), self.nullable, hash(self.value_type)))
        return self._hash

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return (
            isinstance(other, Array)
            and self.value_type == other.value_type
            and self.nullable == other.nullable
        )

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)
        return value

    @classmethod
    def parse_composite(cls, args):
        if len(args) != 1:
            raise ValueError(
                "%s<> should be supplied with exactly one type." % cls.__name__.upper()
            )
        return cls(args[0])

    def cast_composite_values(self, value):
        if value is None and self.nullable:
            return value
        if not isinstance(value, list):
            raise ValueError("Array data type requires `list`, instead of %s" % value)
        element_data_type = self.value_type
        return [validate_value(element, element_data_type) for element in value]


class Map(CompositeDataType):
    """
    Represents map type in MaxCompute.

    :param key_type: type of keys in the array
    :param value_type: type of values in the array

    :Example:

    >>> from odps import types as odps_types
    >>>
    >>> map_type = odps_types.Map(odps_types.string, odps_types.Array(odps_types.bigint))
    >>> print(map_type)
    map<string, array<bigint>>
    >>> print(map_type.key_type)
    string
    >>> print(map_type.value_type)
    array<bigint>

    :Note:

    Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality.
    """

    __slots__ = "nullable", "key_type", "value_type", "_hash", "_use_ordered_dict"
    _type_id = 102

    def __init__(self, key_type, value_type, nullable=True):
        super(Map, self).__init__(nullable=nullable)
        key_type = validate_data_type(key_type)
        value_type = validate_data_type(value_type)
        self.key_type = key_type
        self.value_type = value_type
        self._use_ordered_dict = options.map_as_ordered_dict
        if self._use_ordered_dict is None:
            self._use_ordered_dict = sys.version_info[:2] <= (3, 6)

    @property
    def name(self):
        return "{0}<{1},{2}>".format(
            type(self).__name__.lower(), self.key_type.name, self.value_type.name
        )

    def _equals(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return (
            DataType._equals(self, other)
            and self.key_type == other.key_type
            and self.value_type == other.value_type
        )

    def __hash__(self):
        if not hasattr(self, "_hash"):
            self._hash = hash(
                (type(self), self.nullable, hash(self.key_type), hash(self.value_type))
            )
        return self._hash

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return (
            isinstance(other, Map)
            and self.key_type == other.key_type
            and self.value_type == other.value_type
            and self.nullable == other.nullable
        )

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)
        return value

    @classmethod
    def parse_composite(cls, args):
        if len(args) != 2:
            raise ValueError(
                "%s<> should be supplied with exactly two types." % cls.__name__.upper()
            )
        return cls(*args)

    def cast_composite_values(self, value):
        if value is None and self.nullable:
            return value
        if not isinstance(value, dict):
            raise ValueError("Map data type requires `dict`, instead of %s" % value)
        key_data_type = self.key_type
        value_data_type = self.value_type

        convert = lambda k, v: (
            validate_value(k, key_data_type),
            validate_value(v, value_data_type),
        )
        dict_type = OrderedDict if self._use_ordered_dict else dict
        return dict_type(convert(k, v) for k, v in six.iteritems(value))


class Struct(CompositeDataType):
    """
    Represents struct type in MaxCompute.

    :param field_types: types of every field, can be a list of (field_name, field_type) tuples
        or a dict with field names as keys and field types as values.

    :Example:

    >>> from odps import types as odps_types
    >>>
    >>> struct_type = odps_types.Struct([("a", "bigint"), ("b", "array<string>")])
    >>> print(struct_type)
    struct<`a`:bigint, `b`:array<string>>
    >>> print(struct_type.field_types)
    OrderedDict([("a", "bigint"), ("b", "array<string>")])
    >>> print(struct_type.field_types["b"])
    array<string>

    :Note:

    Need to set ``options.sql.use_odps2_extension = True`` to enable full functionality.
    """

    __slots__ = "nullable", "field_types", "_hash"
    _type_id = 103

    def __init__(self, field_types, nullable=True):
        super(Struct, self).__init__(nullable=nullable)
        self.field_types = OrderedDict()
        if isinstance(field_types, dict):
            field_types = six.iteritems(field_types)
        for k, v in field_types:
            self.field_types[k] = validate_data_type(v)
        self.namedtuple_type = xnamedtuple(
            "StructNamedTuple", list(self.field_types.keys())
        )

        self._struct_as_dict = options.struct_as_dict
        if self._struct_as_dict:
            self._use_ordered_dict = options.struct_as_ordered_dict
            if self._use_ordered_dict is None:
                self._use_ordered_dict = sys.version_info[:2] <= (3, 6)
            warnings.warn(
                "Representing struct values as dicts is now deprecated. Try config "
                "`options.struct_as_dict=False` and return structs as named tuples "
                "instead.",
                DeprecationWarning,
            )
        else:
            self._use_ordered_dict = False

    @property
    def name(self):
        parts = ",".join(
            "`%s`:%s" % (k, v.name) for k, v in six.iteritems(self.field_types)
        )
        return "{0}<{1}>".format(type(self).__name__.lower(), parts)

    def _equals(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return (
            isinstance(other, Struct)
            and len(self.field_types) == len(other.field_types)
            and all(
                self.field_types[k] == other.field_types.get(k)
                for k in six.iterkeys(self.field_types)
            )
        )

    def __hash__(self):
        if not hasattr(self, "_hash"):
            fields_hash = hash(
                tuple((hash(k), hash(v)) for k, v in six.iteritems(self.field_types))
            )
            self._hash = hash((type(self), self.nullable, fields_hash))
        return self._hash

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        return (
            isinstance(other, Struct)
            and self == other
            and self.nullable == other.nullable
        )

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)
        return value

    @classmethod
    def parse_composite(cls, args):
        if any(not isinstance(a, tuple) and ":" not in a for a in args):
            raise ValueError("Every field defined in STRUCT should be given a name.")

        def conv_type_tuple(type_tuple):
            if isinstance(type_tuple, tuple):
                return type_tuple
            else:
                return tuple(_split_struct_kv(type_tuple))

        return cls(conv_type_tuple(a) for a in args)

    def cast_composite_values(self, value):
        if value is None and self.nullable:
            return value
        if self._struct_as_dict:
            dict_hook = OrderedDict if self._use_ordered_dict else dict
            if isinstance(value, tuple):
                fields = getattr(value, "_fields", None) or self.field_types.keys()
                value = dict_hook(compat.izip(fields, value))
            if isinstance(value, dict):
                return dict_hook(
                    (validate_value(k, string), validate_value(value[k], tp))
                    for k, tp in six.iteritems(self.field_types)
                )
        else:
            if isinstance(value, tuple):
                return self.namedtuple_type(
                    *(
                        validate_value(v, t)
                        for v, t in zip(value, self.field_types.values())
                    )
                )
            elif isinstance(value, dict):
                list_val = [
                    validate_value(value.get(key), field_type)
                    for key, field_type in self.field_types.items()
                ]
                return self.namedtuple_type(*list_val)
        raise ValueError(
            "Struct data type requires `tuple` or `dict`, instead of %s" % type(value)
        )


@_primitive_doc
class Json(DataType):
    _type_id = 12

    _max_length = 8 * 1024 * 1024  # 8M

    def can_implicit_cast(self, other):
        if isinstance(other, six.string_types):
            other = validate_data_type(other)

        if isinstance(other, (String, Binary)):
            return True
        return super(Json, self).can_implicit_cast(other)

    def validate_value(self, val, max_field_size=None):
        if val is None and self.nullable:
            return True
        max_field_size = max_field_size or self._max_length
        if len(val) > max_field_size:
            raise ValueError(
                "InvalidData: Length of string(%s) is more than %sM.'"
                % (val, max_field_size / (1024**2))
            )
        if not isinstance(
            val, (six.string_types, list, dict, six.integer_types, float)
        ):
            raise ValueError("InvalidData: cannot accept %r as json", val)
        return True

    def cast_value(self, value, data_type):
        self._can_cast_or_throw(value, data_type)

        if isinstance(data_type, String):
            return _json.loads(utils.to_text(value))
        return value


tinyint = Tinyint()
smallint = Smallint()
int_ = Int()
bigint = Bigint()
float_ = Float()
double = Double()
string = String()
datetime = Datetime()
boolean = Boolean()
binary = Binary()
timestamp = Timestamp()
timestamp_ntz = TimestampNTZ()
interval_day_time = IntervalDayTime()
interval_year_month = IntervalYearMonth()
date = Date()
json = Json()

_odps_primitive_data_types = dict(
    [
        (t.name, t)
        for t in (
            tinyint,
            smallint,
            int_,
            bigint,
            float_,
            double,
            string,
            datetime,
            date,
            boolean,
            binary,
            timestamp,
            timestamp_ntz,
            interval_day_time,
            interval_year_month,
            json,
        )
    ]
)


_composite_handlers = dict(
    varchar=Varchar,
    char=Char,
    decimal=Decimal,
    array=Array,
    map=Map,
    struct=Struct,
)


def _split_struct_kv(kv_str):
    parts = utils.split_backquoted(kv_str, ":", 1)
    if len(parts) > 2 or len(parts) <= 0:
        raise ValueError("Invalid type string: %s" % kv_str)

    parts[-1] = parts[-1].strip()
    if len(parts) > 1:
        parts[0] = utils.strip_backquotes(parts[0])
    return parts


def parse_composite_types(type_str, handlers=None):
    handlers = handlers or _composite_handlers

    def _create_composite_type(typ, *args):
        parts = _split_struct_kv(typ)
        typ = parts[-1]
        if typ not in handlers:
            raise ValueError("Composite type %s not supported." % typ.upper())
        ctype = handlers[typ].parse_composite(args)

        if len(parts) == 1:
            return ctype
        else:
            return parts[0], ctype

    token_stack = []
    bracket_stack = []
    token_start = 0
    type_str = type_str.strip()
    quoted = False

    for idx, ch in enumerate(type_str):
        if ch == "`":
            quoted = not quoted
        elif not quoted:
            if ch == "<" or ch == "(":
                bracket_stack.append(len(token_stack))
                token = type_str[token_start:idx].strip()
                token_stack.append(token)
                token_start = idx + 1
            elif ch == ">" or ch == ")":
                token = type_str[token_start:idx].strip()
                if token:
                    token_stack.append(token)
                bracket_pos = bracket_stack.pop()
                ctype = _create_composite_type(*token_stack[bracket_pos:])
                token_stack = token_stack[:bracket_pos]
                token_stack.append(ctype)
                token_start = idx + 1
            elif ch == ",":
                token = type_str[token_start:idx].strip()
                if token:
                    token_stack.append(token)
                token_start = idx + 1
    if len(token_stack) != 1:
        return _create_composite_type(type_str)
    return token_stack[0]


def validate_data_type(data_type):
    """
    Parse data type instance from string in MaxCompute DDL.

    :Example:

    >>> field_type = validate_data_type("array<int>")
    >>> print(field_type)
    array<int>
    >>> print(field_type.value_type)
    int
    """
    if isinstance(data_type, DataType):
        return data_type

    composite_err_msg = None
    if isinstance(data_type, six.string_types):
        data_type = data_type.strip().lower()
        if data_type in _odps_primitive_data_types:
            return _odps_primitive_data_types[data_type]

        try:
            return parse_composite_types(data_type)
        except ValueError as ex:
            composite_err_msg = str(ex)

    if composite_err_msg is not None:
        raise ValueError(
            "Invalid data type: %s. %s" % (repr(data_type), composite_err_msg)
        )
    raise ValueError("Invalid data type: %s" % repr(data_type))


integer_builtins = six.integer_types
float_builtins = (float,)
try:
    import numpy as np

    integer_builtins += (np.integer,)
    float_builtins += (np.float_,)
except ImportError:
    pass

_odps_primitive_to_builtin_types = OrderedDict(
    (
        (bigint, integer_builtins),
        (tinyint, integer_builtins),
        (smallint, integer_builtins),
        (int_, integer_builtins),
        (double, float_builtins),
        (float_, float_builtins),
        (string, (six.text_type, six.binary_type)),
        (binary, six.binary_type),
        (datetime, _datetime),
        (boolean, bool),
        (interval_year_month, Monthdelta),
        (date, _date),
        (json, (list, dict, six.string_types, six.integer_types, float)),
    )
)
_odps_primitive_clses = set(type(dt) for dt in _odps_primitive_to_builtin_types.keys())


integer_types = (tinyint, smallint, int_, bigint)


def infer_primitive_data_type(value):
    for data_type, builtin_types in six.iteritems(_odps_primitive_to_builtin_types):
        if isinstance(value, builtin_types):
            return data_type


_pd_type_patched = False


def _patch_pd_types():
    if (
        timestamp not in _odps_primitive_to_builtin_types
        or timestamp_ntz not in _odps_primitive_to_builtin_types
        or interval_day_time not in _odps_primitive_to_builtin_types
    ):
        try:
            import pandas as pd

            new_type_map = {
                timestamp: pd.Timestamp,
                timestamp_ntz: pd.Timestamp,
                interval_day_time: pd.Timedelta,
            }
            _odps_primitive_to_builtin_types.update(new_type_map)
            _odps_primitive_clses.update({type(tp) for tp in new_type_map})
        except (ImportError, ValueError):
            pass


def _cast_primitive_value(value, data_type):
    if value is None or type(value) is pd_na_type:
        return None

    if options.tunnel.string_as_binary:
        if isinstance(value, six.text_type):
            value = value.encode("utf-8")
    else:
        if isinstance(value, (bytearray, six.binary_type)):
            value = value.decode("utf-8")

    builtin_types = _odps_primitive_to_builtin_types[data_type]
    if isinstance(value, builtin_types):
        return value

    inferred_data_type = infer_primitive_data_type(value)
    if inferred_data_type is None:
        raise ValueError(
            "Unknown value type, cannot infer from value: %s, type: %s"
            % (value, type(value))
        )

    return data_type.cast_value(value, inferred_data_type)


def validate_value(value, data_type, max_field_size=None):
    global _pd_type_patched

    if not _pd_type_patched:
        _patch_pd_types()
        _pd_type_patched = True

    if type(data_type) in _odps_primitive_clses:
        res = _cast_primitive_value(value, data_type)
    else:
        if isinstance(data_type, (BaseTimestamp, IntervalDayTime)):
            raise ImportError(
                "To use %s in pyodps, you need to install pandas.",
                data_type.name.upper(),
            )

        failed = False
        try:
            res = data_type.cast_composite_values(value)
        except AttributeError:
            failed = True
        if failed:
            raise ValueError("Unknown data type: %s" % data_type)

    data_type.validate_value(res, max_field_size=max_field_size)
    return res
