core/maxframe/dataframe/datasource/from_tensor.py (360 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import OrderedDict from typing import Any, Dict, List, Union import numpy as np import pandas as pd from ... import opcodes from ...core import ENTITY_TYPE, OutputType from ...serialization.serializables import AnyField, KeyField from ...tensor.core import Tensor from ...tensor.datasource import tensor as astensor from ...typing_ import EntityType, TileableType from ..core import INDEX_TYPE, SERIES_TYPE from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import parse_index class DataFrameFromTensor(DataFrameOperator, DataFrameOperatorMixin): """ Represents data from maxframe tensor """ _op_type_ = opcodes.DATAFRAME_FROM_TENSOR input = AnyField("input") index = AnyField("index") columns = AnyField("columns") def __init__(self, *args, **kwargs): kwargs["_output_types"] = [OutputType.dataframe] super().__init__(*args, **kwargs) def _set_inputs(self, inputs: List[EntityType]): super()._set_inputs(inputs) inputs_iter = iter(self._inputs) if self.input is not None: if not isinstance(self.input, dict): self.input = next(inputs_iter) else: # check each value for input new_input = OrderedDict() for k, v in self.input.items(): if isinstance(v, ENTITY_TYPE): new_input[k] = next(inputs_iter) else: new_input[k] = v self.input = new_input if isinstance(self.index, ENTITY_TYPE): self.index = next(inputs_iter) def __call__( self, input_tensor: Tensor, index: Union[TileableType, pd.Index], columns: pd.Index, dtypes: pd.Series, ): if isinstance(input_tensor, dict): return self._call_input_1d_tileables(input_tensor, index, columns, dtypes) elif input_tensor is not None: return self._call_input_tensor(input_tensor, index, columns, dtypes) else: return self._call_tensor_none(index, columns, dtypes) def _process_index( self, index: Union[TileableType, pd.Index], inputs: List[EntityType] ): if not isinstance(index, pd.Index): if isinstance(index, INDEX_TYPE): index_value = index.index_value inputs.append(index) elif isinstance(index, ENTITY_TYPE): index = astensor(index) if index.ndim != 1: raise ValueError(f"index should be 1-d, got {index.ndim}-d") index_value = parse_index( pd.Index([], dtype=index.dtype), index, type(self).__name__ ) inputs.append(index) else: index = pd.Index(index) index_value = parse_index(index) else: index_value = parse_index(index) return index_value def _call_input_1d_tileables( self, input_1d_tileables: Dict[Any, TileableType], index: Union[TileableType, pd.Index], columns: pd.Index, dtypes: pd.Series, ): tileables = [] shape = None for tileable in input_1d_tileables.values(): tileable_shape = astensor(tileable).shape if len(tileable_shape) > 0: if shape is None: shape = tileable_shape elif shape != tileable_shape: raise ValueError("input 1-d tensors should have same shape") if isinstance(tileable, ENTITY_TYPE): tileables.append(tileable) if index is not None: tileable_size = tileables[0].shape[0] if hasattr(index, "shape"): index_size = index.shape[0] else: index_size = len(index) if ( not pd.isna(tileable_size) and not pd.isna(index_size) and tileable_size != index_size ): raise ValueError( f"index {index} should have the same shape " f"with tensor: {tileable_size}" ) index_value = self._process_index(index, tileables) else: self.index = index = pd.RangeIndex(0, tileables[0].shape[0]) index_value = parse_index(index) if columns is not None: if len(input_1d_tileables) != len(columns): raise ValueError( f"columns {columns} should have size {len(input_1d_tileables)}" ) if not isinstance(columns, pd.Index): if isinstance(columns, ENTITY_TYPE): raise NotImplementedError("The columns value cannot be a tileable") columns = pd.Index(columns) columns_value = parse_index(columns, store_data=True) else: columns_value = parse_index( pd.RangeIndex(0, len(input_1d_tileables)), store_data=True ) shape = (shape[0], len(input_1d_tileables)) return self.new_dataframe( tileables, shape, dtypes=dtypes, index_value=index_value, columns_value=columns_value, ) def _call_input_tensor( self, input_tensor: Tensor, index: Union[TileableType, pd.Index], columns: pd.Index, dtypes: pd.Series, ): if input_tensor.ndim not in {1, 2}: raise ValueError("Must pass 1-d or 2-d input") inputs = [input_tensor] if index is not None: if input_tensor.shape[0] != len(index): raise ValueError( f"index {index} should have the same shape with tensor: {input_tensor.shape[0]}" ) index_value = self._process_index(index, inputs) elif isinstance(input_tensor, SERIES_TYPE): index_value = input_tensor.index_value else: stop = input_tensor.shape[0] stop = -1 if np.isnan(stop) else stop index = self.index = pd.RangeIndex(start=0, stop=stop) index_value = parse_index(index) if columns is not None: if not ( input_tensor.ndim == 1 and len(columns) == 1 or input_tensor.shape[1] == len(columns) ): raise ValueError( f"columns {columns} should have the same shape with tensor: {input_tensor.shape[1]}" ) if not isinstance(columns, pd.Index): if isinstance(columns, ENTITY_TYPE): raise NotImplementedError("The columns value cannot be a tileable") columns = pd.Index(columns) columns_value = parse_index(columns, store_data=True) else: if input_tensor.ndim == 1: # convert to 1-d DataFrame columns_value = parse_index( pd.RangeIndex(start=0, stop=1), store_data=True ) else: columns_value = parse_index( pd.RangeIndex(start=0, stop=input_tensor.shape[1]), store_data=True ) if input_tensor.ndim == 1: shape = (input_tensor.shape[0], 1) else: shape = input_tensor.shape return self.new_dataframe( inputs, shape, dtypes=dtypes, index_value=index_value, columns_value=columns_value, ) def _call_tensor_none( self, index: Union[TileableType, pd.Index], columns: pd.Index, dtypes: pd.Series ): inputs = [] shape = [] if index is not None: index_value = self._process_index(index, inputs) shape.append(index.shape[0]) else: index = self.index = pd.Index([], dtype=object) index_value = parse_index(index) shape.append(0) if columns is not None: if not isinstance(columns, pd.Index): if isinstance(columns, ENTITY_TYPE): raise NotImplementedError("The columns value cannot be a tileable") columns = pd.Index(columns) columns_value = parse_index(columns, store_data=True) shape.append(columns.shape[0]) else: columns_value = parse_index(pd.Index([], dtype=object), store_data=True) shape.append(0) return self.new_dataframe( inputs, shape=tuple(shape), dtypes=dtypes, index_value=index_value, columns_value=columns_value, ) def dataframe_from_tensor( tensor: Tensor, index: Union[TileableType, pd.Index] = None, columns: Union[pd.Index, list] = None, gpu: bool = None, sparse: bool = False, ): if tensor is not None: if tensor.ndim > 2 or tensor.ndim <= 0: raise TypeError( f"Not support create DataFrame from {tensor.ndim} dims tensor" ) try: col_num = tensor.shape[1] except IndexError: col_num = 1 gpu = tensor.op.gpu if gpu is None else gpu dtypes = pd.Series([tensor.dtype] * col_num, index=columns) if columns is None: columns = dtypes.index else: gpu = None if columns is not None: dtypes = pd.Series([], index=columns) else: dtypes = pd.Series([], index=pd.Index([], dtype=object)) if index is not None and not isinstance(index, ENTITY_TYPE): index = pd.Index(index) op = DataFrameFromTensor( input=tensor, index=index, columns=columns, gpu=gpu, sparse=sparse ) return op(tensor, index, columns, dtypes) def dataframe_from_1d_tileables( d: Dict[Any, TileableType], index: Union[TileableType, pd.Index, list] = None, columns: Union[pd.Index, list] = None, gpu: bool = None, sparse: bool = False, ): data = dict() for k, v in d.items(): if isinstance(v, (list, tuple)) and any( isinstance(sv, ENTITY_TYPE) for sv in v ): data[k] = astensor(v) else: data[k] = v d = data if columns is not None: tileables = [d.get(c) for c in columns] else: columns = list(d.keys()) tileables = list(d.values()) gpu = ( next((t.op.gpu for t in tileables if hasattr(t, "op")), False) if gpu is None else gpu ) dtypes = pd.Series( [t.dtype if hasattr(t, "dtype") else pd.Series(t).dtype for t in tileables], index=columns, ) if index is not None and not isinstance(index, ENTITY_TYPE): index = pd.Index(index) op = DataFrameFromTensor( input=d, index=index, columns=columns, gpu=gpu, sparse=sparse ) return op(d, index, columns, dtypes) class SeriesFromTensor(DataFrameOperator, DataFrameOperatorMixin): _op_type_ = opcodes.SERIES_FROM_TENSOR input = KeyField("input") index = AnyField("index") def _set_inputs(self, inputs: List[EntityType]): super()._set_inputs(inputs) if self.input is not None: self.input = self.inputs[0] if self.index is not None and hasattr(self.index, "key"): self.index = self.inputs[-1] def __call__( self, input_tensor: Tensor, index: Union[TileableType, pd.Index], dtype: np.dtype, name: Any, ): inputs = [input_tensor] if input_tensor is not None else [] if index is not None: if not isinstance(index, pd.Index): if isinstance(index, INDEX_TYPE): self.index = index index_value = index.index_value inputs.append(index) elif isinstance(index, ENTITY_TYPE): self.index = index index = astensor(index) if index.ndim != 1: raise ValueError(f"index should be 1-d, got {index.ndim}-d") index_value = parse_index( pd.Index([], dtype=index.dtype), index, type(self).__name__ ) inputs.append(index) else: self.index = index = pd.Index(index) index_value = parse_index(index) else: self.index = index index_value = parse_index(index) elif input_tensor is not None: if pd.isna(input_tensor.shape[0]): pd_index = pd.RangeIndex(-1) else: pd_index = pd.RangeIndex(start=0, stop=input_tensor.shape[0]) index_value = parse_index(pd_index) self.index = pd_index else: self.index = index = pd.Index([], dtype=object) index_value = parse_index(index) if input_tensor is not None: shape = input_tensor.shape elif index is not None: shape = index.shape else: shape = (0,) return self.new_series( inputs, shape=shape, dtype=dtype, index_value=index_value, name=name ) def series_from_tensor( tensor: Tensor, index: Union[TileableType, pd.Index, list] = None, name: Any = None, dtype: np.dtype = None, gpu: bool = None, sparse: bool = False, ): if tensor is not None: if tensor.ndim > 1 or tensor.ndim <= 0: raise TypeError(f"Not support create Series from {tensor.ndim} dims tensor") gpu = tensor.op.gpu if gpu is None else gpu dtype = dtype or tensor.dtype else: gpu = None dtype = dtype or np.dtype(float) op = SeriesFromTensor(input=tensor, gpu=gpu, sparse=sparse) return op(tensor, index, dtype, name)