odps/df/ui.py (140 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import math
import decimal
import os
from . import DataFrame, Scalar
from ..ui.common import build_trait
from ..console import in_ipython_frontend, is_widgets_available
from ..compat import six, long_type
from ..utils import to_text, init_progress_ui
MAX_TABLE_FETCH_SIZE = 10
try:
import ipywidgets as widgets
from IPython.display import display
from traitlets import Unicode, Dict, Int
except ImportError:
DFViewWidget = None
DFViewMixin = None
_rv = None
_rv_list = None
_rv_table = None
else:
def _rv(v):
if isinstance(v, (float, int, long_type)):
return v
elif isinstance(v, decimal.Decimal):
return float(v)
else:
return to_text(v)
def _rv_list(l):
return [_rv(v) for v in l]
def _rv_table(tb):
return [_rv_list(r) for r in tb]
class DFViewMixin(object):
@property
def mock_ui(self):
cls = type(self)
if not hasattr(cls, '_mock_ui'):
cls._mock_ui = init_progress_ui(mock=True)
return cls._mock_ui
@staticmethod
def _render_results(df, **kw):
return dict(columns=[to_text(c) for c in df.columns], data=_rv_table(df.values), **kw)
@staticmethod
def _render_graph(pd_df, group_cols, key_cols, **kw):
def _to_columnar_dict(pd_df):
vdict = dict((_rv(c), _rv_list(pd_df[c].values)) for c in pd_df.columns)
vdict['$count'] = len(pd_df)
return vdict
if key_cols:
distinct_df = pd_df[list(key_cols)].drop_duplicates()
key_dfs = _rv_table(distinct_df.sort_values(key_cols).values)
else:
key_dfs = None
groups, sub_dfs = [], []
if group_cols:
for gp, cols in pd_df.groupby(group_cols):
g = [gp] if not isinstance(gp, (list, tuple, set, slice)) else gp
groups.append(_rv_list(list(g)))
sub_dfs.append(_to_columnar_dict(cols.drop(group_cols, axis=1)))
else:
groups = None
sub_dfs.append(_to_columnar_dict(pd_df))
return dict(groups=groups, data=sub_dfs, keys=key_dfs, **kw)
def _handle_fetch_table(self, content, _):
df = self.df.to_pandas(ui=self.mock_ui) if isinstance(self.df, DataFrame) else self.df
start_pos, stop_pos, page = 0, len(df), 0
if len(df) > MAX_TABLE_FETCH_SIZE:
page = content.get('page', 0)
start_pos, stop_pos = page * MAX_TABLE_FETCH_SIZE, min((page + 1) * MAX_TABLE_FETCH_SIZE, len(df))
result_dict = self._render_results(df[start_pos:stop_pos], page=page, size=len(df))
if page is not None:
result_dict['pages'] = math.ceil(len(df) * 1.0 / MAX_TABLE_FETCH_SIZE)
self.table_records = result_dict
def _handle_aggregate_graph(self, content, _):
groups = content.get('groups') or list()
keys = content.get('keys') or list()
if isinstance(keys, six.string_types):
keys = [keys]
values = content.get('values')
if not values:
return
if isinstance(values, dict):
group_by_keys = list(set(groups + keys))
def _gen_agg_func(df):
agg_funcs = dict()
for field, methods in six.iteritems(values):
for method in methods:
func_key = u'{}__{}'.format(field, method)
agg_funcs[func_key] = getattr(df[field], method)()
return agg_funcs
if group_by_keys:
string_keys = [col.name for col in self.df.schema
if col.name in group_by_keys and col.type.name == 'string']
other_keys = [col.name for col in self.df.schema
if col.name not in group_by_keys or col.name not in string_keys]
col_list = [self.df[col].map(lambda v: to_text(v)) for col in string_keys] +\
[self.df[col] for col in other_keys]
trans_df = self.df.select(col_list)
pd_results = trans_df.groupby(group_by_keys).agg(**_gen_agg_func(trans_df)) \
.to_pandas(ui=self.mock_ui)
else:
group_col = '__group_col__'
augment_df = self.df[Scalar(1).rename(group_col), self.df]
pd_results = augment_df.groupby([group_col]).agg(**_gen_agg_func(augment_df)) \
.exclude(group_col).to_pandas(ui=self.mock_ui)
else:
pd_results = self.df.select(list(set(groups + keys + values))).to_pandas(ui=self.mock_ui)
result_dict = self._render_graph(pd_results, groups, keys)
setattr(self, content.get('target'), result_dict)
class DFViewWidget(DFViewMixin, widgets.DOMWidget):
_view_name = build_trait(Unicode, 'DFView', sync=True)
_view_module = build_trait(Unicode, 'pyodps/df-view', sync=True)
start_sign = build_trait(Int, False, sync=True)
error_sign = build_trait(Int, False, sync=True)
table_records = build_trait(Dict, sync=True)
bar_chart_records = build_trait(Dict, sync=True)
pie_chart_records = build_trait(Dict, sync=True)
line_chart_records = build_trait(Dict, sync=True)
scatter_chart_records = build_trait(Dict, sync=True)
def __init__(self, df, **kwargs):
super(DFViewWidget, self).__init__(**kwargs)
self.df = df
self.on_msg(self._handle_msgs)
def _handle_msgs(self, _, content, buffers):
try:
action = content.get('action', '')
if action == 'start_widget':
self.start_sign = (self.start_sign + 1) % 2
elif action == 'fetch_table':
self._handle_fetch_table(content, buffers)
elif action == 'aggregate_graph':
self._handle_aggregate_graph(content, buffers)
except:
self.error_sign = (self.error_sign + 1) % 2
raise
def show_df_widget(df, **kwargs):
if in_ipython_frontend() and DFViewWidget:
widget = DFViewWidget(df, **kwargs)
if is_widgets_available():
display(widget)