# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import operator
import re
from copy import copy as copy_obj
from numbers import Integral
from typing import Sequence, Type

import numpy as np
import pandas as pd
from pandas._libs import lib
from pandas.api.extensions import (
    ExtensionArray,
    ExtensionDtype,
    register_extension_dtype,
)
from pandas.api.indexers import check_array_indexer
from pandas.api.types import (
    is_array_like,
    is_list_like,
    is_scalar,
    is_string_dtype,
    pandas_dtype,
)
from pandas.arrays import StringArray as StringArrayBase
from pandas.compat import set_function_name
from pandas.core import ops
from pandas.core.algorithms import take

try:
    from pandas._libs.arrays import NDArrayBacked
except ImportError:
    NDArrayBacked = None
try:
    import pyarrow as pa

    pa_null = pa.NULL
except ImportError:  # pragma: no cover
    pa = None
    pa_null = None
try:
    import pyarrow.compute as pc
except ImportError:  # pragma: no cover
    pc = None

from ..config import options
from ..core import is_kernel_mode
from ..utils import pd_release_version, tokenize

_use_bool_any_all = pd_release_version[:2] >= (1, 3)
_use_extension_index = pd_release_version[:2] >= (1, 4)
_object_engine_for_string_array = pd_release_version[:2] >= (1, 5)

if _object_engine_for_string_array:
    StringArrayBase = type(StringArrayBase)(
        "StringArrayBase", StringArrayBase.__bases__, dict(StringArrayBase.__dict__)
    )


class ArrowDtype(ExtensionDtype):
    @property
    def arrow_type(self):  # pragma: no cover
        raise NotImplementedError

    def __from_arrow__(self, array):
        return self.construct_array_type()(array)


@register_extension_dtype
class ArrowStringDtype(ArrowDtype):
    """
    Extension dtype for arrow string data.

    .. warning::

       ArrowStringDtype is considered experimental. The implementation and
       parts of the API may change without warning.

       In particular, ArrowStringDtype.na_value may change to no longer be
       ``numpy.nan``.

    Attributes
    ----------
    None

    Methods
    -------
    None

    Examples
    --------
    >>> import maxframe.dataframe as md
    >>> md.ArrowStringDtype()
    ArrowStringDtype
    """

    type = str
    kind = "U"
    name = "Arrow[string]"
    na_value = pa_null

    @classmethod
    def construct_from_string(cls, string):
        if string == cls.name:
            return cls()
        else:
            raise TypeError(f"Cannot construct a '{cls}' from '{string}'")

    @classmethod
    def construct_array_type(cls) -> "Type[ArrowStringArray]":
        return ArrowStringArray

    @property
    def arrow_type(self):
        return pa.string()


@register_extension_dtype
class ArrowStringDtypeAlias(ArrowStringDtype):
    name = "arrow_string"  # register an alias name for compatibility


class ArrowListDtypeType(type):
    """
    the type of ArrowListDtype, this metaclass determines subclass ability
    """

    pass


class ArrowListDtype(ArrowDtype):
    _metadata = ("_value_type",)

    def __init__(self, dtype):
        if isinstance(dtype, type(self)):
            dtype = dtype.value_type
        if pa and isinstance(dtype, pa.DataType):
            dtype = dtype.to_pandas_dtype()

        dtype = pandas_dtype(dtype)
        if is_string_dtype(dtype) and not isinstance(dtype, ArrowStringDtype):
            # convert string dtype to arrow string dtype
            dtype = ArrowStringDtype()

        self._value_type = dtype

    @property
    def value_type(self):
        return self._value_type

    @property
    def kind(self):
        return "O"

    @property
    def type(self):
        return ArrowListDtypeType

    @property
    def name(self):
        return f"Arrow[List[{self.value_type.name}]]"

    @property
    def arrow_type(self):
        if isinstance(self._value_type, ArrowDtype):
            arrow_subdtype = self._value_type.arrow_type
        else:
            arrow_subdtype = pa.from_numpy_dtype(self._value_type)
        return pa.list_(arrow_subdtype)

    def __repr__(self) -> str:
        return self.name

    @classmethod
    def construct_array_type(cls) -> "Type[ArrowListArray]":
        return ArrowListArray

    @classmethod
    def construct_from_string(cls, string):
        msg = f"Cannot construct a 'ArrowListDtype' from '{string}'"
        xpr = re.compile(r"Arrow\[List\[(?P<value_type>[^,]*)\]\]$")
        m = xpr.match(string)
        if m:
            value_type = m.groupdict()["value_type"]
            return ArrowListDtype(value_type)
        else:
            raise TypeError(msg)

    @classmethod
    def is_dtype(cls, dtype) -> bool:
        dtype = getattr(dtype, "dtype", dtype)
        if isinstance(dtype, str):
            try:
                cls.construct_from_string(dtype)
            except TypeError:
                return False
            else:
                return True
        else:
            return isinstance(dtype, cls)

    def __hash__(self):
        return super().__hash__()

    def __eq__(self, other):
        if not isinstance(other, ArrowListDtype):
            return False

        value_type = self._value_type
        other_value_type = other._value_type

        try:
            return value_type == other_value_type
        except TypeError:
            # cannot compare numpy dtype and extension dtype
            return other_value_type == value_type


class ArrowArray(ExtensionArray):
    _arrow_type = None

    def __init__(self, values, dtype: ArrowDtype = None, copy=False):
        pandas_only = self._pandas_only()

        if pa is not None and not pandas_only:
            self._init_by_arrow(values, dtype=dtype, copy=copy)
        elif not is_kernel_mode():
            # not in kernel mode, allow to use numpy handle data
            # just for infer dtypes purpose
            self._init_by_numpy(values, dtype=dtype, copy=copy)
        else:
            raise ImportError("Cannot create ArrowArray when `pyarrow` not installed")

        # for test purpose
        self._force_use_pandas = pandas_only

    def _init_by_arrow(self, values, dtype: ArrowDtype = None, copy=False):
        if isinstance(values, (pd.Index, pd.Series)):
            # for pandas Index and Series,
            # convert to PandasArray
            values = values.array

        if isinstance(values, type(self)):
            arrow_array = values._arrow_array
        elif isinstance(values, ExtensionArray):
            # if come from pandas object like index,
            # convert to pandas StringArray first,
            # validation will be done in construct
            arrow_array = pa.chunked_array([pa.array(values, from_pandas=True)])
        elif isinstance(values, pa.ChunkedArray):
            arrow_array = values
        elif isinstance(values, pa.Array):
            arrow_array = pa.chunked_array([values])
        elif len(values) == 0:  # pragma: no cover
            arrow_array = pa.chunked_array([pa.array([], type=dtype.arrow_type)])
        else:
            arrow_array = pa.chunked_array([pa.array(values, type=dtype.arrow_type)])

        if copy:
            arrow_array = copy_obj(arrow_array)

        self._use_arrow = True
        self._arrow_array = arrow_array

        if NDArrayBacked is not None and isinstance(self, NDArrayBacked):
            NDArrayBacked.__init__(self, np.array([]), dtype)
        else:
            self._dtype = dtype

    def _init_by_numpy(self, values, dtype: ArrowDtype = None, copy=False):
        self._use_arrow = False

        ndarray = np.array(values, copy=copy)
        if NDArrayBacked is not None and isinstance(self, NDArrayBacked):
            NDArrayBacked.__init__(self, ndarray, dtype)
        else:
            self._dtype = dtype
            self._ndarray = np.array(values, copy=copy)

    @classmethod
    def _pandas_only(cls):
        return options.dataframe.arrow_array.pandas_only

    def __repr__(self):
        return f"{type(self).__name__}({repr(self._array)})"

    @property
    def _array(self):
        return self._arrow_array if self._use_arrow else self._ndarray

    @property
    def dtype(self) -> "Type[ArrowDtype]":
        return self._dtype

    @property
    def nbytes(self) -> int:
        if self._use_arrow:
            return sum(
                x.size
                for chunk in self._arrow_array.chunks
                for x in chunk.buffers()
                if x is not None
            )
        else:
            return self._ndarray.nbytes

    @property
    def shape(self):
        if self._use_arrow:
            return (self._arrow_array.length(),)
        else:
            return self._ndarray.shape

    def memory_usage(self, deep=True) -> int:
        if self._use_arrow:
            return self.nbytes
        else:
            return pd.Series(self._ndarray).memory_usage(index=False, deep=deep)

    @classmethod
    def _to_arrow_array(cls, scalars):
        return pa.array(scalars)

    @classmethod
    def _from_sequence(cls, scalars, dtype=None, copy=False):
        if pa is None or cls._pandas_only():
            # pyarrow not installed, just return numpy
            ret = np.empty(len(scalars), dtype=object)
            ret[:] = scalars
            return cls(ret)

        if pa_null is not None and isinstance(scalars, type(pa_null)):
            scalars = []
        elif not hasattr(scalars, "dtype"):
            ret = np.empty(len(scalars), dtype=object)
            for i, s in enumerate(scalars):
                ret[i] = s
            scalars = ret
        elif isinstance(scalars, cls):
            if copy:
                scalars = scalars.copy()
            return scalars
        arrow_array = pa.chunked_array([cls._to_arrow_array(scalars)])
        return cls(arrow_array, dtype=dtype, copy=copy)

    @classmethod
    def _from_sequence_of_strings(cls, strings, dtype=None, copy=False):
        return cls._from_sequence(strings, dtype=dtype, copy=copy)

    @staticmethod
    def _can_process_slice_via_arrow(slc):
        if not isinstance(slc, slice):
            return False
        if slc.step is not None and slc.step != 1:
            return False
        if slc.start is not None and not isinstance(
            slc.start, Integral
        ):  # pragma: no cover
            return False
        if slc.stop is not None and not isinstance(
            slc.stop, Integral
        ):  # pragma: no cover
            return False
        return True

    def _values_for_factorize(self):
        arr = self.to_numpy()
        mask = self.isna()
        arr[mask] = -1
        return arr, -1

    def _values_for_argsort(self):
        return self.to_numpy()

    @classmethod
    def _from_factorized(cls, values, original):
        return cls(values)

    @staticmethod
    def _process_pos(pos, length, is_start):
        if pos is None:
            return 0 if is_start else length
        return pos + length if pos < 0 else pos

    @classmethod
    def _post_scalar_getitem(cls, lst):
        return lst.to_pandas()[0]

    def __getitem__(self, item):
        cls = type(self)

        if pa is None or self._force_use_pandas:
            # pyarrow not installed
            result = self._ndarray[item]
            if pd.api.types.is_scalar(item):
                return result
            else:
                return type(self)(result)

        has_take = hasattr(self._arrow_array, "take")
        if not self._force_use_pandas and has_take:
            if pd.api.types.is_scalar(item):
                item = item + len(self) if item < 0 else item
                return self._post_scalar_getitem(self._arrow_array.take([item]))
            elif self._can_process_slice_via_arrow(item):
                length = len(self)
                start, stop = item.start, item.stop
                start = self._process_pos(start, length, True)
                stop = self._process_pos(stop, length, False)
                return cls(
                    self._arrow_array.slice(offset=start, length=stop - start),
                    dtype=self._dtype,
                )
            elif hasattr(item, "dtype") and np.issubdtype(item.dtype, np.bool_):
                return cls(
                    self._arrow_array.filter(pa.array(item, from_pandas=True)),
                    dtype=self._dtype,
                )
            elif hasattr(item, "dtype"):
                length = len(self)
                item = np.where(item < 0, item + length, item)
                return cls(self._arrow_array.take(item), dtype=self._dtype)

        array = np.asarray(self._arrow_array.to_pandas())
        return cls(array[item], dtype=self._dtype)

    @classmethod
    def _concat_same_type(cls, to_concat: Sequence["ArrowArray"]) -> "ArrowArray":
        if pa is None or cls._pandas_only():
            # pyarrow not installed
            return cls(np.concatenate([x._array for x in to_concat]))

        chunks = list(
            itertools.chain.from_iterable(x._arrow_array.chunks for x in to_concat)
        )
        if len(chunks) == 0:
            chunks = [pa.array([], type=to_concat[0].dtype.arrow_type)]
        return cls(pa.chunked_array(chunks))

    def __len__(self):
        return len(self._array)

    def __array__(self, dtype=None):
        return self.to_numpy(dtype=dtype)

    def to_numpy(self, dtype=None, copy=False, na_value=lib.no_default):
        if self._use_arrow:
            array = np.asarray(self._arrow_array.to_pandas())
        else:
            array = self._ndarray
        if copy or na_value is not lib.no_default:
            array = array.copy()
        if na_value is not lib.no_default:
            array[self.isna()] = na_value
        return array

    @classmethod
    def _array_fillna(cls, array, value):
        return array.fillna(value)

    def fillna(self, value=None, method=None, limit=None):
        cls = type(self)

        if pa is None or self._force_use_pandas:
            # pyarrow not installed
            return cls(
                pd.Series(self.to_numpy()).fillna(
                    value=value, method=method, limit=limit
                )
            )

        chunks = []
        for chunk_array in self._arrow_array.chunks:
            array = chunk_array.to_pandas()
            if method is None:
                result_array = self._array_fillna(array, value)
            else:
                result_array = array.fillna(value=value, method=method, limit=limit)
            chunks.append(pa.array(result_array, from_pandas=True))
        return cls(pa.chunked_array(chunks), dtype=self._dtype)

    def astype(self, dtype, copy=True):
        dtype = pandas_dtype(dtype)
        if isinstance(dtype, ArrowStringDtype):
            if copy:
                return self.copy()
            return self

        if pa is None or self._force_use_pandas:
            # pyarrow not installed
            if isinstance(dtype, ArrowDtype):
                dtype = dtype.type
            return type(self)(pd.Series(self.to_numpy()).astype(dtype, copy=copy))

        # try to slice 1 record to get the result dtype
        test_array = self._arrow_array.slice(0, 1).to_pandas()
        test_result_array = test_array.astype(dtype).array
        if _use_extension_index:
            test_result_type = type(test_array.astype(dtype).values)
            if test_result_type is np.ndarray:
                test_result_type = np.array
        else:
            test_result_type = type(test_result_array)

        result_array = test_result_type(
            np.full(
                self.shape,
                test_result_array.dtype.na_value,
                dtype=np.asarray(test_result_array).dtype,
            )
        )

        start = 0
        # use chunks to do astype
        for chunk_array in self._arrow_array.chunks:
            result_array[start : start + len(chunk_array)] = (
                chunk_array.to_pandas().astype(dtype).array
            )
            start += len(chunk_array)
        return result_array

    def isna(self):
        if (
            not self._force_use_pandas
            and self._use_arrow
            and hasattr(self._arrow_array, "is_null")
        ):
            return self._arrow_array.is_null().to_pandas().to_numpy()
        elif self._use_arrow:
            return pd.isna(self._arrow_array.to_pandas()).to_numpy()
        else:
            return pd.isna(self._ndarray)

    def take(self, indices, allow_fill=False, fill_value=None):
        if (
            allow_fill is False or (allow_fill and fill_value is self.dtype.na_value)
        ) and len(self) > 0:
            return type(self)(self[indices], dtype=self._dtype)

        if self._use_arrow:
            array = self._arrow_array.to_pandas().to_numpy()
        else:
            array = self._ndarray

        replace = False
        if allow_fill and (fill_value is None or fill_value == self._dtype.na_value):
            fill_value = self.dtype.na_value
            replace = True

        result = take(array, indices, fill_value=fill_value, allow_fill=allow_fill)
        del array
        if replace and pa is not None:
            # pyarrow cannot recognize pa.NULL
            result[result == self.dtype.na_value] = None
        return type(self)(result, dtype=self._dtype)

    def copy(self):
        if self._use_arrow:
            return type(self)(copy_obj(self._arrow_array))
        else:
            return type(self)(self._ndarray.copy())

    def unique(self):
        if self._force_use_pandas or not self._use_arrow or not hasattr(pc, "unique"):
            return type(self)(np.unique(self.to_numpy()), dtype=self._dtype)
        return type(self)(pc.unique(self._arrow_array), dtype=self._dtype)

    def value_counts(self, dropna=False):
        if self._use_arrow:
            series = self._arrow_array.to_pandas()
        else:
            series = pd.Series(self._ndarray)
        return type(self)(series.value_counts(dropna=dropna), dtype=self._dtype)

    if _use_bool_any_all:

        def any(self, axis=0, out=None):
            return self.to_numpy().astype(bool).any(axis=axis, out=out)

        def all(self, axis=0, out=None):
            return self.to_numpy().astype(bool).all(axis=axis, out=out)

    else:

        def any(self, axis=0, out=None):
            return self.to_numpy().any(axis=axis, out=out)

        def all(self, axis=0, out=None):
            return self.to_numpy().all(axis=axis, out=out)

    def __maxframe_tokenize__(self):
        if self._use_arrow:
            return tokenize(
                [
                    memoryview(x)
                    for chunk in self._arrow_array.chunks
                    for x in chunk.buffers()
                    if x is not None
                ]
            )
        else:
            return self._ndarray


class ArrowStringArray(ArrowArray, StringArrayBase):
    def __init__(self, values, dtype=None, copy=False):
        if dtype is not None:
            assert isinstance(dtype, ArrowStringDtype)
        ArrowArray.__init__(self, values, ArrowStringDtype(), copy=copy)

    @classmethod
    def from_scalars(cls, values):
        if pa is None or cls._pandas_only():
            return cls._from_sequence(values)
        else:
            arrow_array = pa.chunked_array([cls._to_arrow_array(values)])
            return cls(arrow_array)

    @classmethod
    def _to_arrow_array(cls, scalars):
        return pa.array(scalars).cast(pa.string())

    def __setitem__(self, key, value):
        if isinstance(value, (pd.Index, pd.Series)):
            value = value.to_numpy()
        if isinstance(value, type(self)):
            value = value.to_numpy()

        key = check_array_indexer(self, key)
        scalar_key = is_scalar(key)
        scalar_value = is_scalar(value)
        if scalar_key and not scalar_value:
            raise ValueError("setting an array element with a sequence.")

        # validate new items
        if scalar_value:
            if pd.isna(value):
                value = None
            elif not isinstance(value, str):
                raise ValueError(
                    f"Cannot set non-string value '{value}' into a ArrowStringArray."
                )
        else:
            if not is_array_like(value):
                value = np.asarray(value, dtype=object)
            if len(value) and not lib.is_string_array(value, skipna=True):
                raise ValueError("Must provide strings.")

        if self._use_arrow:
            string_array = np.asarray(self._arrow_array.to_pandas())
            string_array[key] = value
            self._arrow_array = pa.chunked_array([pa.array(string_array)])
        else:
            self._ndarray[key] = value

    # Override parent because we have different return types.
    @classmethod
    def _create_arithmetic_method(cls, op):
        # Note: this handles both arithmetic and comparison methods.
        def method(self, other):
            is_arithmetic = True if op.__name__ in ops.ARITHMETIC_BINOPS else False
            pandas_only = cls._pandas_only()

            is_other_array = False
            if not is_scalar(other):
                is_other_array = True
                other = np.asarray(other)

            self_is_na = self.isna()
            other_is_na = pd.isna(other)
            mask = self_is_na | other_is_na

            if pa is None or pandas_only:
                if is_arithmetic:
                    ret = np.empty(self.shape, dtype=object)
                else:
                    ret = np.zeros(self.shape, dtype=bool)
                valid = ~mask
                arr = (
                    self._arrow_array.to_pandas().to_numpy()
                    if self._use_arrow
                    else self._ndarray
                )
                o = other[valid] if is_other_array else other
                ret[valid] = op(arr[valid], o)
                if is_arithmetic:
                    return ArrowStringArray(ret)
                else:
                    return pd.arrays.BooleanArray(ret, mask)

            chunks = []
            mask_chunks = []
            start = 0
            for chunk_array in self._arrow_array.chunks:
                chunk_array = np.asarray(chunk_array.to_pandas())
                end = start + len(chunk_array)
                chunk_mask = mask[start:end]
                chunk_valid = ~chunk_mask

                if is_arithmetic:
                    result = np.empty(chunk_array.shape, dtype=object)
                else:
                    result = np.zeros(chunk_array.shape, dtype=bool)

                chunk_other = other
                if is_other_array:
                    chunk_other = other[start:end]
                    chunk_other = chunk_other[chunk_valid]

                # calculate only for both not None
                result[chunk_valid] = op(chunk_array[chunk_valid], chunk_other)

                if is_arithmetic:
                    chunks.append(pa.array(result, type=pa.string(), from_pandas=True))
                else:
                    chunks.append(result)
                    mask_chunks.append(chunk_mask)

            if is_arithmetic:
                return ArrowStringArray(pa.chunked_array(chunks))
            else:
                return pd.arrays.BooleanArray(
                    np.concatenate(chunks), np.concatenate(mask_chunks)
                )

        return set_function_name(method, f"__{op.__name__}__", cls)

    def shift(self, periods: int = 1, fill_value: object = None) -> "ArrowStringArray":
        return ExtensionArray.shift(self, periods=periods, fill_value=fill_value)

    @classmethod
    def _add_arithmetic_ops(cls):
        cls.__add__ = cls._create_arithmetic_method(operator.add)
        cls.__radd__ = cls._create_arithmetic_method(ops.radd)

        cls.__mul__ = cls._create_arithmetic_method(operator.mul)
        cls.__rmul__ = cls._create_arithmetic_method(ops.rmul)

    @classmethod
    def _add_comparison_ops(cls):
        cls.__eq__ = cls._create_comparison_method(operator.eq)
        cls.__ne__ = cls._create_comparison_method(operator.ne)
        cls.__lt__ = cls._create_comparison_method(operator.lt)
        cls.__gt__ = cls._create_comparison_method(operator.gt)
        cls.__le__ = cls._create_comparison_method(operator.le)
        cls.__ge__ = cls._create_comparison_method(operator.ge)

    _create_comparison_method = _create_arithmetic_method


ArrowStringArray._add_arithmetic_ops()
ArrowStringArray._add_comparison_ops()


class ArrowListArray(ArrowArray):
    def __init__(self, values, dtype: ArrowListDtype = None, copy=False):
        if dtype is None:
            if isinstance(values, type(self)):
                dtype = values.dtype
            elif pa is not None:
                if isinstance(values, pa.Array):
                    dtype = ArrowListDtype(values.type.value_type)
                elif isinstance(values, pa.ChunkedArray):
                    dtype = ArrowListDtype(values.type.value_type)
                else:
                    values = pa.array(values)
                    if values.type == pa.null():
                        dtype = ArrowListDtype(pa.string())
                    else:
                        dtype = ArrowListDtype(values.type.value_type)
            else:
                value_type = np.asarray(values[0]).dtype
                dtype = ArrowListDtype(value_type)

        super().__init__(values, dtype=dtype, copy=copy)

    def to_numpy(self, dtype=None, copy=False, na_value=lib.no_default):
        if self._use_arrow:
            s = self._arrow_array.to_pandas()
        else:
            s = pd.Series(self._ndarray)
        s = s.map(lambda x: x.tolist() if hasattr(x, "tolist") else x)
        if copy or na_value is not lib.no_default:
            s = s.copy()
        if na_value is not lib.no_default:
            s[self.isna()] = na_value
        return np.asarray(s)

    @classmethod
    def _post_scalar_getitem(cls, lst):
        return lst[0].as_py()

    def __setitem__(self, key, value):
        if isinstance(value, (pd.Index, pd.Series)):
            value = value.to_numpy()

        key = check_array_indexer(self, key)
        scalar_key = is_scalar(key)

        # validate new items
        if scalar_key:
            if pd.isna(value):
                value = None
            elif not is_list_like(value):
                raise ValueError("Must provide list.")

        if self._use_arrow:
            array = np.asarray(self._arrow_array.to_pandas())
            array[key] = value
            self._arrow_array = pa.chunked_array(
                [pa.array(array, type=self.dtype.arrow_type)]
            )
        else:
            self._ndarray[key] = value

    @classmethod
    def _array_fillna(cls, series, value):
        # cannot fillna directly, because value is a list-like object
        return series.apply(lambda x: x if is_list_like(x) or not pd.isna(x) else value)

    def astype(self, dtype, copy=True):
        msg = f"cannot astype from {self.dtype} to {dtype}"
        dtype = pandas_dtype(dtype)
        if isinstance(dtype, ArrowListDtype):
            if self.dtype == dtype:
                if copy:
                    return self.copy()
                return self
            else:
                if self._use_arrow:
                    try:
                        arrow_array = self._arrow_array.cast(dtype.arrow_type)
                        return ArrowListArray(arrow_array)
                    except (NotImplementedError, pa.ArrowInvalid):
                        raise TypeError(msg)
                else:

                    def f(x):
                        return pd.Series(x).astype(dtype.value_type.type).tolist()

                    try:
                        arr = pd.Series(self._ndarray)
                        ret = arr.map(f).to_numpy()
                        return ArrowStringArray(ret)
                    except ValueError:
                        raise TypeError(msg)

        try:
            return super().astype(dtype, copy=copy)
        except ValueError:
            raise TypeError(msg)
