odps/df/backends/frame.py (428 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import functools import itertools import operator try: import numpy as np import pandas as pd has_pandas = True except (ImportError, ValueError): has_pandas = False from ...compat import u, six, izip as zip, Version from ...config import options from ...console import ( get_console_size, in_interactive_session, in_ipython_frontend, in_qtconsole, ) from ...models import TableSchema from ...types import Partition from ...tunnel.io.types import odps_type_to_arrow_type from ...utils import to_str, to_text, deprecated from . import formatter as fmt class ResultFrame(six.Iterator): class ResultRecord(list): def __init__(self, columns, values): self._columns = columns self._column_id_by_name = dict([(v.name, k) for k, v in enumerate(columns)]) super(ResultFrame.ResultRecord, self).__init__(values) def __getitem__(self, item): if isinstance(item, six.string_types): item = self._column_id_by_name[item] return list.__getitem__(self, item) def iteritems(self): for col, val in zip(self._columns, self): yield col.name, val def keys(self): return [c.name for c in self._columns] def __init__(self, data, columns=None, schema=None, index=None, pandas=True): if columns is None and schema is None: raise ValueError('Either columns or schema should be provided') if columns is None and schema is not None: columns = schema.columns self._columns = columns self._names = [to_text(c.name) for c in self._columns] self._types = [c.type for c in self._columns] self._index = index if has_pandas and pandas: if isinstance(data, pd.DataFrame): self._values = self._reset_pd_axes(data) else: self._values = pd.DataFrame( [self._get_values(r) for r in data], columns=self._names, index=index, ) self._cast_pd_types() self._index = self._values.index self._pandas = True else: if self._index is None: self._index = [] self._values = [] for i, r in zip(itertools.count(0), data): self._values.append(self._get_values(r)) self._index.append(i) else: self._values = list(self._get_values(r) for r in data) self._pandas = False self._cursor = -1 def _cast_pd_types(self): from .odpssql.types import df_type_to_odps_type from .pd.types import df_type_to_np_type assert isinstance(self._values, pd.DataFrame) if options.tunnel.pd_cast_mode == "arrow": dest_types = [ pd.ArrowDtype(odps_type_to_arrow_type(df_type_to_odps_type(tp))) for tp in self._types ] self._values = self._values.astype(dict(zip(self._names, dest_types))) elif options.tunnel.pd_cast_mode == "numpy": dest_types = [df_type_to_np_type(tp) for tp in self._types] dest_df = self._values.copy() for (col_name, src_type), dest_type in zip( self._values.dtypes.items(), dest_types ): if src_type == np.dtype("O") and dest_type.kind in "fiu": dest_type = np.dtype(float) if dest_type.kind in "iu" else dest_type dest_df[col_name] = dest_df[col_name].astype(dest_type) self._values = dest_df def _reset_pd_axes(self, data): if Version(pd.__version__) < Version("0.23.0"): data = data.values return pd.DataFrame( [self._get_values(r) for r in data], columns=self._names, index=self._index, ) ret_data = data if list(data.columns) != self._names: v = ret_data.set_axis(self._names, axis="columns") ret_data = v if v is not None else ret_data if self._index is not None and list(data.index) != list(self._index): if data is ret_data: ret_data = ret_data.set_axis(self._index, axis="index", inplace=False) else: ret_data.set_axis(self._index, axis="index", inplace=True) elif self._index is None and ( not isinstance(data.index, pd.RangeIndex) or data.index.start != 0 ): if data is ret_data: ret_data = ret_data.reset_index(drop=True, inplace=False) else: ret_data.reset_index(drop=True, inplace=True) return ret_data def _get_values(self, r): if hasattr(r, 'values'): return r.values return r def __len__(self): return len(self.values) @property def columns(self): return self._columns @property def names(self): return self._names @property def types(self): return self._types @property def schema(self): return TableSchema( columns=[col for col in self._columns if not isinstance(col, Partition)], partitions=[col for col in self._columns if isinstance(col, Partition)], ) @property def index(self): return self._index @property def values(self): return self._values def get_column_data(self, item): item = to_text(item) if item not in self.names: return None if self._pandas: if item in self.values.index.names: return self.values.index.get_level_values(item) else: return self.values[item] else: col_id = list(idx for idx, c in self.names if c == item)[0] return [r[col_id] for r in self.values] if has_pandas: def to_pandas(self, wrap=False): from .. import DataFrame if wrap: return DataFrame(self.values) else: return self.values def __getattr__(self, item): col = self.get_column_data(item) if col is None: raise AttributeError("ResultFrame has no column '{0}'".format(item)) return col def __getitem__(self, item): if isinstance(item, six.integer_types): if self._pandas: return self._values.iloc[item] else: return self.ResultRecord(self._columns, self._values[item]) elif isinstance(item, slice): if self._pandas: return self._values.iloc[item] else: return ResultFrame( self._values[item], columns=self._columns, index=self._index[item], pandas=self._pandas, ) elif isinstance(item, tuple) and len(item) == 2: if self._pandas: return self._values.iloc[item] else: if isinstance(item[1], slice): frame = self[item[0]] values = [r[item[1]] for r in frame._values] return ResultFrame( values, columns=self._columns[item[1]], index=frame._index, pandas=self._pandas, ) else: values = [r[item[1]] for r in self[item[0]]._values] return values @deprecated("Direct call of next(ResultFrame) is deprecated") def __next__(self): self._cursor += 1 try: if self._pandas: return self._values.iloc[self._cursor] else: return self.ResultRecord(self._columns, self._values[self._cursor]) except IndexError: raise StopIteration def __iter__(self): cursor = 0 while True: try: yield self[cursor] cursor += 1 except IndexError: return def concat(self, *frames, **kwargs): if len(frames) == 2 and isinstance(frames[1], int): frames, axis = frames[:1], frames[1] else: axis = kwargs.pop("axis", 0) if kwargs: raise TypeError("Cannot accept arguments %s" % ",".join(kwargs)) if len(frames) == 0: return self if self._pandas: try: from pandas import concat except ImportError: from pandas.tools.merge import concat values = (self._values,) + tuple(frame._values for frame in frames) pd_data = concat(values, axis=axis) if axis == 0: columns = self._columns else: columns = functools.reduce( operator.add, (self._columns,) + tuple(frame._columns for frame in frames), ) return ResultFrame(pd_data, columns=columns, pandas=True) else: if axis == 0: if any(self._columns != frame._columns for frame in frames): raise ValueError('Cannot concat two frame of different columns') values = functools.reduce( operator.add, (self._values,) + tuple(frame._values for frame in frames), ) indices = functools.reduce( operator.add, (self._index,) + tuple(frame._index for frame in frames), ) return ResultFrame( values, columns=self._columns, index=indices, pandas=self._pandas ) else: if any(self._index != frame._index for frame in frames): raise ValueError('Cannot concat two frames of different indexes') sub_tuple = (self._values,) + tuple(frame._values for frame in frames) values = [ functools.reduce(operator.add, vals) for vals in zip(*sub_tuple) ] columns = functools.reduce( operator.add, (self._columns,) + tuple(frame._columns for frame in frames), ) return ResultFrame( values, columns, index=self._index, pandas=self._pandas ) @property def dtypes(self): return [it.type for it in self._columns] def __len__(self): return len(self._values) def _repr_fits_vertical_(self): """ Check length against max_rows. """ max_rows = options.display.max_rows return len(self) <= max_rows def _repr_fits_horizontal_(self, ignore_width=False): """ Check if full repr fits in horizontal boundaries imposed by the display options width and max_columns. In case off non-interactive session, no boundaries apply. ignore_width is here so ipnb+HTML output can behave the way users expect. display.max_columns remains in effect. GH3541, GH3573 """ width, height = get_console_size() max_columns = options.display.max_columns nb_columns = len(self.columns) # exceed max columns if (max_columns and nb_columns > max_columns) or ( (not ignore_width) and width and nb_columns > (width // 2) ): return False if ( ignore_width # used by repr_html under IPython notebook # scripts ignore terminal dims or not in_interactive_session() ): return True if options.display.width is not None or in_ipython_frontend(): # check at least the column row for excessive width max_rows = 1 else: max_rows = options.display.max_rows # when auto-detecting, so width=None and not in ipython front end # check whether repr fits horizontal by actualy checking # the width of the rendered repr buf = six.StringIO() # only care about the stuff we'll actually print out # and to_string on entire frame may be expensive d = self if not (max_rows is None): # unlimited rows # min of two, where one may be None d = d[: min(max_rows, len(d))] else: return True d.to_string(buf=buf) value = buf.getvalue() repr_width = max([len(l) for l in value.split('\n')]) return repr_width < width def __repr__(self): return to_str(self.__unicode__()) def __unicode__(self): """ Return a string representation for a particular DataFrame """ if self._pandas: return to_text(repr(self._values)) buf = six.StringIO(u("")) max_rows = options.display.max_rows max_cols = options.display.max_columns show_dimensions = options.display.show_dimensions if options.display.expand_frame_repr: width, _ = get_console_size() else: width = None self.to_string( buf=buf, max_rows=max_rows, max_cols=max_cols, line_width=width, show_dimensions=show_dimensions, ) return to_str(buf.getvalue()) def _repr_html_(self): """ Return a html representation for a particular DataFrame. Mainly for IPython notebook. """ # qtconsole doesn't report its line width, and also # behaves badly when outputting an HTML table # that doesn't fit the window, so disable it. # XXX: In IPython 3.x and above, the Qt console will not attempt to # display HTML, so this check can be removed when support for IPython 2.x # is no longer needed. if self._pandas and options.display.notebook_widget: from .. import DataFrame from ..ui import show_df_widget show_df_widget(DataFrame(self._values, schema=self.schema)) if self._pandas: return self._values._repr_html_() if in_qtconsole(): # 'HTML output is disabled in QtConsole' return None if options.display.notebook_repr_html: max_rows = options.display.max_rows max_cols = options.display.max_columns show_dimensions = options.display.show_dimensions return self.to_html( max_rows=max_rows, max_cols=max_cols, show_dimensions=show_dimensions, notebook=True, ) else: return None def to_string( self, buf=None, columns=None, col_space=None, header=True, index=True, na_rep='NaN', formatters=None, float_format=None, sparsify=None, index_names=True, justify=None, line_width=None, max_rows=None, max_cols=None, show_dimensions=False, ): """ Render a DataFrame to a console-friendly tabular output. """ formatter = fmt.ResultFrameFormatter( self, buf=buf, columns=columns, col_space=col_space, na_rep=na_rep, formatters=formatters, float_format=float_format, sparsify=sparsify, justify=justify, index_names=index_names, header=header, index=index, line_width=line_width, max_rows=max_rows, max_cols=max_cols, show_dimensions=show_dimensions, ) formatter.to_string() if buf is None: result = formatter.buf.getvalue() return result def to_html( self, buf=None, columns=None, col_space=None, header=True, index=True, na_rep='NaN', formatters=None, float_format=None, sparsify=None, index_names=True, justify=None, bold_rows=True, classes=None, escape=True, max_rows=None, max_cols=None, show_dimensions=False, notebook=False, ): """ Render a DataFrame as an HTML table. `to_html`-specific options: bold_rows : boolean, default True Make the row labels bold in the output classes : str or list or tuple, default None CSS class(es) to apply to the resulting html table escape : boolean, default True Convert the characters <, >, and & to HTML-safe sequences.= max_rows : int, optional Maximum number of rows to show before truncating. If None, show all. max_cols : int, optional Maximum number of columns to show before truncating. If None, show all. """ formatter = fmt.ResultFrameFormatter( self, buf=buf, columns=columns, col_space=col_space, na_rep=na_rep, formatters=formatters, float_format=float_format, sparsify=sparsify, justify=justify, index_names=index_names, header=header, index=index, bold_rows=bold_rows, escape=escape, max_rows=max_rows, max_cols=max_cols, show_dimensions=show_dimensions, ) formatter.to_html(classes=classes, notebook=notebook) if buf is None: return formatter.buf.getvalue()