odps/df/backends/formatter.py (1,025 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from textwrap import dedent
from ... import compat
from ...config import options
from ...compat import cgi, u, izip, Iterable
from ...console import get_terminal_size
from ...utils import to_text, to_str, indent, require_package
from ...models import Table
from ..types import *
from ..expr.expressions import CollectionExpr, Scalar
from ..utils import is_source_collection, traverse_until_source
def is_integer(val):
return isinstance(val, six.integer_types)
def is_sequence(x):
try:
iter(x)
len(x) # it has a length
return not isinstance(x, six.string_types) and \
not isinstance(x, six.binary_type)
except (TypeError, AttributeError):
return False
def _pprint_seq(seq, _nest_lvl=0, max_seq_items=None, **kwds):
"""
internal. pprinter for iterables. you should probably use pprint_thing()
rather then calling this directly.
bounds length of printed sequence, depending on options
"""
if isinstance(seq, set):
fmt = u("set([%s])")
else:
fmt = u("[%s]") if hasattr(seq, '__setitem__') else u("(%s)")
if max_seq_items is False:
nitems = len(seq)
else:
nitems = max_seq_items or options.display.max_seq_items or len(seq)
s = iter(seq)
r = []
for i in range(min(nitems, len(seq))): # handle sets, no slicing
r.append(pprint_thing(next(s), _nest_lvl + 1, max_seq_items=max_seq_items, **kwds))
body = ", ".join(r)
if nitems < len(seq):
body += ", ..."
elif isinstance(seq, tuple) and len(seq) == 1:
body += ','
return fmt % body
def _pprint_dict(seq, _nest_lvl=0, max_seq_items=None, **kwds):
"""
internal. pprinter for iterables. you should probably use pprint_thing()
rather then calling this directly.
"""
fmt = u("{%s}")
pairs = []
pfmt = u("%s: %s")
if max_seq_items is False:
nitems = len(seq)
else:
nitems = max_seq_items or options.display.max_seq_items or len(seq)
for k, v in list(seq.items())[:nitems]:
pairs.append(pfmt % (pprint_thing(k, _nest_lvl + 1, max_seq_items=max_seq_items, **kwds),
pprint_thing(v, _nest_lvl + 1, max_seq_items=max_seq_items, **kwds)))
if nitems < len(seq):
return fmt % (", ".join(pairs) + ", ...")
else:
return fmt % ", ".join(pairs)
def pprint_thing(thing, _nest_lvl=0, escape_chars=None, default_escapes=False,
quote_strings=False, max_seq_items=None):
"""
This function is the sanctioned way of converting objects
to a unicode representation.
properly handles nested sequences containing unicode strings
(unicode(object) does not)
Parameters
----------
thing : anything to be formatted
_nest_lvl : internal use only. pprint_thing() is mutually-recursive
with pprint_sequence, this argument is used to keep track of the
current nesting level, and limit it.
escape_chars : list or dict, optional
Characters to escape. If a dict is passed the values are the
replacements
default_escapes : bool, default False
Whether the input escape characters replaces or adds to the defaults
max_seq_items : False, int, default None
Pass thru to other pretty printers to limit sequence printing
Returns
-------
result - unicode object on py2, str on py3. Always Unicode.
"""
def as_escaped_unicode(thing, escape_chars=escape_chars):
# Unicode is fine, else we try to decode using utf-8 and 'replace'
# if that's not it either, we have no way of knowing and the user
# should deal with it himself.
try:
result = six.text_type(thing) # we should try this first
except UnicodeDecodeError:
# either utf-8 or we replace errors
result = str(thing).decode('utf-8', "replace")
translate = {'\t': r'\t',
'\n': r'\n',
'\r': r'\r',
}
if isinstance(escape_chars, dict):
if default_escapes:
translate.update(escape_chars)
else:
translate = escape_chars
escape_chars = list(escape_chars.keys())
else:
escape_chars = escape_chars or tuple()
for c in escape_chars:
result = result.replace(c, translate[c])
return six.text_type(result)
if (six.PY3 and hasattr(thing, '__next__')) or hasattr(thing, 'next'):
return six.text_type(thing)
elif (isinstance(thing, dict) and
_nest_lvl < options.display.pprint_nest_depth):
result = _pprint_dict(thing, _nest_lvl, quote_strings=True, max_seq_items=max_seq_items)
elif is_sequence(thing) and _nest_lvl < \
options.display.pprint_nest_depth:
result = _pprint_seq(thing, _nest_lvl, escape_chars=escape_chars,
quote_strings=quote_strings, max_seq_items=max_seq_items)
elif isinstance(thing, six.string_types) and quote_strings:
if six.PY3:
fmt = "'%s'"
else:
fmt = "u'%s'"
result = fmt % as_escaped_unicode(thing)
else:
result = as_escaped_unicode(thing)
return six.text_type(result) # always unicode
def _justify(texts, max_len, mode='right'):
"""
Perform ljust, center, rjust against string or list-like
"""
if mode == 'left':
return [x.ljust(max_len) for x in texts]
elif mode == 'center':
return [x.center(max_len) for x in texts]
else:
return [x.rjust(max_len) for x in texts]
def _join_unicode(lines, sep=''):
try:
return sep.join(lines)
except UnicodeDecodeError:
sep = six.text_type(sep)
return sep.join([x.decode('utf-8') if isinstance(x, str) else x
for x in lines])
def adjoin(space, *lists, **kwargs):
"""
Glues together two sets of strings using the amount of space requested.
The idea is to prettify.
----------
space : int
number of spaces for padding
lists : str
list of str which being joined
strlen : callable
function used to calculate the length of each str. Needed for unicode
handling.
justfunc : callable
function used to justify str. Needed for unicode handling.
"""
strlen = kwargs.pop('strlen', len)
justfunc = kwargs.pop('justfunc', _justify)
out_lines = []
newLists = []
lengths = [max(map(strlen, x)) + space for x in lists[:-1]]
# not the last one
lengths.append(max(map(len, lists[-1])))
maxLen = max(map(len, lists))
for i, lst in enumerate(lists):
nl = justfunc(lst, lengths[i], mode='left')
nl.extend([' ' * lengths[i]] * (maxLen - len(lst)))
newLists.append(nl)
toJoin = zip(*newLists)
for lines in toJoin:
out_lines.append(_join_unicode(lines))
return _join_unicode(out_lines, sep='\n')
def _make_fixed_width(strings, justify='right', minimum=None,
adj=None):
if len(strings) == 0 or justify == 'all':
return strings
if adj is None:
adj = _get_adjustment()
max_len = max([adj.len(x) for x in strings])
if minimum is not None:
max_len = max(minimum, max_len)
conf_max = options.display.max_colwidth
if conf_max is not None and max_len > conf_max:
max_len = conf_max
def just(x):
if conf_max is not None:
if (conf_max > 3) & (adj.len(x) > max_len):
x = x[:max_len - 3] + '...'
return x
strings = [just(x) for x in strings]
result = adj.justify(strings, max_len, mode=justify)
return result
def _binify(cols, line_width):
adjoin_width = 1
bins = []
curr_width = 0
i_last_column = len(cols) - 1
for i, w in enumerate(cols):
w_adjoined = w + adjoin_width
curr_width += w_adjoined
if i_last_column == i:
wrap = curr_width + 1 > line_width and i > 0
else:
wrap = curr_width + 2 > line_width and i > 0
if wrap:
bins.append(i)
curr_width = w_adjoined
bins.append(len(cols))
return bins
class TextAdjustment(object):
def __init__(self):
self.encoding = options.display.encoding
def len(self, text):
return compat.strlen(text, encoding=self.encoding)
def justify(self, texts, max_len, mode='right'):
return _justify(texts, max_len, mode=mode)
def adjoin(self, space, *lists, **kwargs):
return adjoin(space, *lists, strlen=self.len,
justfunc=self.justify, **kwargs)
class EastAsianTextAdjustment(TextAdjustment):
def __init__(self):
super(EastAsianTextAdjustment, self).__init__()
if options.display.unicode.ambiguous_as_wide:
self.ambiguous_width = 2
else:
self.ambiguous_width = 1
def len(self, text):
return compat.east_asian_len(to_text(text), encoding=self.encoding,
ambiguous_width=self.ambiguous_width)
def justify(self, texts, max_len, mode='right'):
# re-calculate padding space per str considering East Asian Width
def _get_pad(t):
return max_len - self.len(t) + len(t)
if mode == 'left':
return [x.ljust(_get_pad(x)) for x in texts]
elif mode == 'center':
return [x.center(_get_pad(x)) for x in texts]
else:
return [x.rjust(_get_pad(x)) for x in texts]
def _get_adjustment():
use_east_asian_width = options.display.unicode.east_asian_width
if use_east_asian_width:
return EastAsianTextAdjustment()
else:
return TextAdjustment()
class TableFormatter(object):
is_truncated = False
show_dimensions = None
@property
def should_show_dimensions(self):
return self.show_dimensions is True or (self.show_dimensions == 'truncate' and
self.is_truncated)
def _get_formatter(self, i):
if isinstance(self.formatters, (list, tuple)):
if is_integer(i):
return self.formatters[i]
else:
return None
else:
if is_integer(i) and i not in self.columns:
i = self.columns[i]
return self.formatters.get(i, None)
class ResultFrameFormatter(TableFormatter):
"""
Render a Expr result
self.to_string() : console-friendly tabular output
self.to_html() : html table
self.to_latex() : LaTeX tabular environment table
"""
def __init__(self, frame, buf=None, columns=None, col_space=None,
header=True, index=True, na_rep='NaN', formatters=None,
justify=None, float_format=None, sparsify=None,
index_names=True, line_width=None, max_rows=None,
max_cols=None, show_dimensions=False, **kwds):
self.frame = frame
self.buf = buf if buf is not None else six.StringIO()
self.show_index_names = index_names
if sparsify is None:
sparsify = options.display.multi_sparse
self.sparsify = sparsify
self.float_format = float_format
self.formatters = formatters if formatters is not None else {}
self.na_rep = na_rep
self.col_space = col_space
self.header = header
self.index = index
self.line_width = line_width
self.max_rows = max_rows
self.max_cols = max_cols
self.max_rows_displayed = min(max_rows or len(self.frame),
len(self.frame))
self.show_dimensions = show_dimensions
if justify is None:
self.justify = options.display.colheader_justify
else:
self.justify = justify
self.kwds = kwds
if columns is not None:
self.columns = columns
self.frame = self.frame[self.columns]
else:
self.columns = frame.columns
self._chk_truncate()
self.adj = _get_adjustment()
def _chk_truncate(self):
'''
Checks whether the frame should be truncated. If so, slices
the frame up.
'''
# Column of which first element is used to determine width of a dot col
self.tr_size_col = -1
# Cut the data to the information actually printed
max_cols = self.max_cols
max_rows = self.max_rows
if max_cols == 0 or max_rows == 0: # assume we are in the terminal (why else = 0)
(w, h) = get_terminal_size()
self.w = w
self.h = h
if self.max_rows == 0:
dot_row = 1
prompt_row = 1
if self.show_dimensions:
show_dimension_rows = 3
n_add_rows = self.header + dot_row + show_dimension_rows + prompt_row
max_rows_adj = self.h - n_add_rows # rows available to fill with actual data
self.max_rows_adj = max_rows_adj
# Format only rows and columns that could potentially fit the screen
if max_cols == 0 and len(self.frame.columns) > w:
max_cols = w
if max_rows == 0 and len(self.frame) > h:
max_rows = h
if not hasattr(self, 'max_rows_adj'):
self.max_rows_adj = max_rows
if not hasattr(self, 'max_cols_adj'):
self.max_cols_adj = max_cols
max_cols_adj = self.max_cols_adj
max_rows_adj = self.max_rows_adj
truncate_h = max_cols_adj and (len(self.columns) > max_cols_adj)
truncate_v = max_rows_adj and (len(self.frame) > max_rows_adj)
frame = self.frame
if truncate_h:
if max_cols_adj == 0:
col_num = len(frame.columns)
elif max_cols_adj == 1:
frame = frame[:, :max_cols]
col_num = max_cols
else:
col_num = (max_cols_adj // 2)
frame = frame[:, :col_num].concat(frame[:, -col_num:], axis=1)
self.tr_col_num = col_num
if truncate_v:
if max_rows_adj == 0:
row_num = len(frame)
if max_rows_adj == 1:
row_num = max_rows
frame = frame[:max_rows, :]
else:
row_num = max_rows_adj // 2
frame = frame[:row_num, :].concat(frame[-row_num:, :])
self.tr_row_num = row_num
self.tr_frame = frame
self.truncate_h = truncate_h
self.truncate_v = truncate_v
self.is_truncated = self.truncate_h or self.truncate_v
def _to_str_columns(self):
"""
Render a DataFrame to a list of columns (as lists of strings).
"""
frame = self.tr_frame
# may include levels names also
str_index = self._get_formatted_index(frame)
str_columns = self._get_formatted_column_labels(frame)
if self.header:
stringified = []
for i, c in enumerate(frame.columns):
cheader = str_columns[i]
max_colwidth = max(self.col_space or 0,
*(self.adj.len(x) for x in cheader))
fmt_values = self._format_col(i)
fmt_values = _make_fixed_width(fmt_values, self.justify,
minimum=max_colwidth,
adj=self.adj)
max_len = max(max([self.adj.len(x) for x in fmt_values]),
max_colwidth)
cheader = self.adj.justify(cheader, max_len, mode=self.justify)
stringified.append(cheader + fmt_values)
else:
stringified = []
for i, c in enumerate(frame):
fmt_values = self._format_col(i)
fmt_values = _make_fixed_width(fmt_values, self.justify,
minimum=(self.col_space or 0),
adj=self.adj)
stringified.append(fmt_values)
strcols = stringified
if self.index:
strcols.insert(0, str_index)
# Add ... to signal truncated
truncate_h = self.truncate_h
truncate_v = self.truncate_v
if truncate_h:
col_num = self.tr_col_num
col_width = self.adj.len(strcols[self.tr_size_col][0]) # infer from column header
strcols.insert(self.tr_col_num + 1, ['...'.center(col_width)] * (len(str_index)))
if truncate_v:
n_header_rows = len(str_index) - len(frame)
row_num = self.tr_row_num
for ix, col in enumerate(strcols):
cwidth = self.adj.len(strcols[ix][row_num]) # infer from above row
is_dot_col = False
if truncate_h:
is_dot_col = ix == col_num + 1
if cwidth > 3 or is_dot_col:
my_str = '...'
else:
my_str = '..'
if ix == 0:
dot_mode = 'left'
elif is_dot_col:
cwidth = self.adj.len(strcols[self.tr_size_col][0])
dot_mode = 'center'
else:
dot_mode = 'right'
dot_str = self.adj.justify([my_str], cwidth, mode=dot_mode)[0]
strcols[ix].insert(row_num + n_header_rows, dot_str)
return strcols
def to_string(self):
"""
Render a DataFrame to a console-friendly tabular output.
"""
frame = self.frame
if len(frame.columns) == 0 or len(frame.index) == 0:
info_line = (u('Empty %s\nColumns: %s\nIndex: %s')
% (type(self.frame).__name__,
pprint_thing(frame.columns),
pprint_thing(frame.index)))
text = info_line
else:
strcols = self._to_str_columns()
if self.line_width is None: # no need to wrap around just print the whole frame
text = self.adj.adjoin(1, *strcols)
elif not isinstance(self.max_cols, int) or self.max_cols > 0: # need to wrap around
text = self._join_multiline(*strcols)
else: # max_cols == 0. Try to fit frame to terminal
text = self.adj.adjoin(1, *strcols).split('\n')
row_lens = [len(it) for it in text]
max_len_col_ix = row_lens.index(max(row_lens))
max_len = row_lens[max_len_col_ix]
headers = [ele[0] for ele in strcols]
# Size of last col determines dot col size. See `self._to_str_columns
size_tr_col = len(headers[self.tr_size_col])
max_len += size_tr_col # Need to make space for largest row plus truncate dot col
dif = max_len - self.w
adj_dif = dif
col_lens = [max(len(it) for it in ele) for ele in strcols]
n_cols = len(col_lens)
counter = 0
while adj_dif > 0 and n_cols > 1:
counter += 1
mid = int(round(n_cols / 2.))
# mid_ix = col_lens.index[mid]
col_len = col_lens[mid]
adj_dif -= (col_len + 1) # adjoin adds one
col_lens = col_lens[:mid] + col_len[mid+1: ]
n_cols = len(col_lens)
max_cols_adj = n_cols - self.index # subtract index column
self.max_cols_adj = max_cols_adj
# Call again _chk_truncate to cut frame appropriately
# and then generate string representation
self._chk_truncate()
strcols = self._to_str_columns()
text = self.adj.adjoin(1, *strcols)
self.buf.writelines(text)
if self.should_show_dimensions:
self.buf.write("\n\n[%d rows x %d columns]"
% (len(frame), len(frame.columns)))
def _join_multiline(self, *strcols):
lwidth = self.line_width
adjoin_width = 1
strcols = list(strcols)
if self.index:
idx = strcols.pop(0)
lwidth -= max(self.adj.len(x) for x in idx) + adjoin_width
col_widths = [max(self.adj.len(x) for x in col)
if len(col) > 0 else 0
for col in strcols]
col_bins = _binify(col_widths, lwidth)
nbins = len(col_bins)
if self.truncate_v:
nrows = self.max_rows_adj + 1
else:
nrows = len(self.frame)
str_lst = []
st = 0
for i, ed in enumerate(col_bins):
row = strcols[st:ed]
row.insert(0, idx)
if nbins > 1:
if ed <= len(strcols) and i < nbins - 1:
row.append([' \\'] + [' '] * (nrows - 1))
else:
row.append([' '] * nrows)
str_lst.append(self.adj.adjoin(adjoin_width, *row))
st = ed
return '\n\n'.join(str_lst)
def _format_col(self, i):
frame = self.tr_frame
formatter = self._get_formatter(i)
return format_array(
frame[:, i],
frame.dtypes[i],
formatter, float_format=self.float_format, na_rep=self.na_rep,
space=self.col_space
)
def to_html(self, classes=None, notebook=False):
"""
Render a DataFrame to a html table.
Parameters
----------
notebook : {True, False}, optional, default False
Whether the generated HTML is for IPython Notebook.
"""
html_renderer = HTMLFormatter(self, classes=classes,
max_rows=self.max_rows,
max_cols=self.max_cols,
notebook=notebook)
if hasattr(self.buf, 'write'):
html_renderer.write_result(self.buf)
elif isinstance(self.buf, six.string_types):
with open(self.buf, 'w') as f:
html_renderer.write_result(f)
else:
raise TypeError('buf is not a file name and it has no write '
' method')
def _get_formatted_column_labels(self, frame):
def is_numeric_dtype(dtype):
return is_number(dtype)
columns = frame.columns
fmt_columns = [col.name for col in columns]
dtypes = self.frame.dtypes
need_leadsp = dict(zip(fmt_columns, map(is_numeric_dtype, dtypes)))
str_columns = [[' ' + x
if not self._get_formatter(i) and need_leadsp[x]
else x]
for i, (col, x) in
enumerate(zip(columns, fmt_columns))]
if self.show_index_names and self.has_index_names:
for x in str_columns:
x.append('')
# self.str_columns = str_columns
return str_columns
@property
def has_index_names(self):
return _has_names(self.frame.index)
@property
def has_column_names(self):
return _has_names(self.frame.columns)
def _get_formatted_index(self, frame):
# Note: this is only used by to_string() and to_latex(), not by to_html().
index = frame.index
show_index_names = self.show_index_names and self.has_index_names
show_col_names = (self.show_index_names and self.has_column_names)
fmt = self._get_formatter('__index__')
fmt_index = [[str(i) for i in index]]
fmt_index = [tuple(_make_fixed_width(list(x), justify='left',
minimum=(self.col_space or 0),
adj=self.adj))
for x in fmt_index]
adjoined = self.adj.adjoin(1, *fmt_index).split('\n')
# empty space for columns
if show_col_names:
col_header = ['%s' % x for x in self._get_column_name_list()]
else:
col_header = ['']
if self.header:
return col_header + adjoined
else:
return adjoined
def _get_column_name_list(self):
names = []
columns = self.frame.columns
names.append('' if columns.name is None else columns.name)
return names
class HTMLFormatter(TableFormatter):
indent_delta = 2
def __init__(self, formatter, classes=None, max_rows=None, max_cols=None,
notebook=False):
self.fmt = formatter
self.classes = classes
self.frame = self.fmt.frame
self.columns = self.fmt.tr_frame.columns
self.elements = []
self.bold_rows = self.fmt.kwds.get('bold_rows', False)
self.escape = self.fmt.kwds.get('escape', True)
self.max_rows = max_rows or len(self.fmt.frame)
self.max_cols = max_cols or len(self.fmt.columns)
self.show_dimensions = self.fmt.show_dimensions
self.is_truncated = (self.max_rows < len(self.fmt.frame) or
self.max_cols < len(self.fmt.columns))
self.notebook = notebook
def write(self, s, indent=0):
rs = pprint_thing(s)
self.elements.append(' ' * indent + rs)
def write_th(self, s, indent=0, tags=None):
if (self.fmt.col_space is not None
and self.fmt.col_space > 0):
tags = (tags or "")
tags += 'style="min-width: %s;"' % self.fmt.col_space
return self._write_cell(s, kind='th', indent=indent, tags=tags)
def write_td(self, s, indent=0, tags=None):
return self._write_cell(s, kind='td', indent=indent, tags=tags)
def _write_cell(self, s, kind='td', indent=0, tags=None):
if tags is not None:
start_tag = '<%s %s>' % (kind, tags)
else:
start_tag = '<%s>' % kind
if self.escape:
# escape & first to prevent double escaping of &
esc = OrderedDict(
[('&', r'&'), ('<', r'<'), ('>', r'>')]
)
else:
esc = {}
rs = pprint_thing(s, escape_chars=esc).strip()
self.write(
'%s%s</%s>' % (start_tag, rs, kind), indent)
def write_tr(self, line, indent=0, indent_delta=4, header=False,
align=None, tags=None, nindex_levels=0):
if tags is None:
tags = {}
if align is None:
self.write('<tr>', indent)
else:
self.write('<tr style="text-align: %s;">' % align, indent)
indent += indent_delta
for i, s in enumerate(line):
val_tag = tags.get(i, None)
if header or (self.bold_rows and i < nindex_levels):
self.write_th(s, indent, tags=val_tag)
else:
self.write_td(s, indent, tags=val_tag)
indent -= indent_delta
self.write('</tr>', indent)
def write_result(self, buf):
indent = 0
frame = self.frame
_classes = ['dataframe'] # Default class.
if self.classes is not None:
if isinstance(self.classes, str):
self.classes = self.classes.split()
if not isinstance(self.classes, (list, tuple)):
raise AssertionError(('classes must be list or tuple, '
'not %s') % type(self.classes))
_classes.extend(self.classes)
if self.notebook:
div_style = ''
try:
import IPython
if compat.Version(IPython.__version__) < compat.Version('3.0.0'):
div_style = ' style="max-width:1500px;overflow:auto;"'
except ImportError:
pass
self.write('<div{0}>'.format(div_style))
try:
import pandas
if compat.Version(pandas.__version__) >= compat.Version('0.20'):
self._write_style()
except ImportError:
pass
self.write('<table border="1" class="%s">' % ' '.join(_classes),
indent)
indent += self.indent_delta
indent = self._write_header(indent)
indent = self._write_body(indent)
self.write('</table>', indent)
if self.should_show_dimensions:
by = chr(215) if six.PY3 else unichr(215) # ×
self.write(u('<p>%d rows %s %d columns</p>') %
(len(frame), by, len(frame.columns)))
if self.notebook:
self.write('</div>')
_put_lines(buf, self.elements)
def _write_header(self, indent):
truncate_h = self.fmt.truncate_h
row_levels = 1
if not self.fmt.header:
# write nothing
return indent
def _column_header():
row = []
if self.fmt.index:
row.append('')
row.extend([col.name for col in self.columns])
return row
self.write('<thead>', indent)
row = []
indent += self.indent_delta
col_row = _column_header()
align = self.fmt.justify
if truncate_h:
ins_col = row_levels + self.fmt.tr_col_num
col_row.insert(ins_col, '...')
self.write_tr(col_row, indent, self.indent_delta, header=True,
align=align)
if self.fmt.has_index_names and self.fmt.index:
row = [
x if x is not None else '' for x in self.frame.index.names
] + [''] * min(len(self.columns), self.max_cols)
if truncate_h:
ins_col = row_levels + self.fmt.tr_col_num
row.insert(ins_col, '')
self.write_tr(row, indent, self.indent_delta, header=True)
indent -= self.indent_delta
self.write('</thead>', indent)
return indent
def _write_style(self):
# We use the "scoped" attribute here so that the desired
# style properties for the data frame are not then applied
# throughout the entire notebook.
template_first = """\
<style scoped>"""
template_last = """\
</style>"""
template_select = """\
.dataframe %s {
%s: %s;
}"""
element_props = [
("tbody tr th:only-of-type", "vertical-align", "middle"),
("tbody tr th", "vertical-align", "top"),
]
element_props.append(("thead th", "text-align", "right"))
template_mid = "\n\n".join(map(lambda t: template_select % t, element_props))
template = dedent("\n".join((template_first, template_mid, template_last)))
self.write(template)
def _write_body(self, indent):
self.write('<tbody>', indent)
indent += self.indent_delta
fmt_values = {}
for i in range(min(len(self.columns), self.max_cols)):
fmt_values[i] = self.fmt._format_col(i)
# write values
if self.fmt.index:
self._write_regular_rows(fmt_values, indent)
else:
for i in range(len(self.frame)):
row = [fmt_values[j][i] for j in range(len(self.columns))]
self.write_tr(row, indent, self.indent_delta, tags=None)
indent -= self.indent_delta
self.write('</tbody>', indent)
indent -= self.indent_delta
return indent
def _write_regular_rows(self, fmt_values, indent):
truncate_h = self.fmt.truncate_h
truncate_v = self.fmt.truncate_v
ncols = len(self.fmt.tr_frame.columns)
nrows = len(self.fmt.tr_frame)
fmt = self.fmt._get_formatter('__index__')
if fmt is not None:
index_values = self.fmt.tr_frame.index.map(fmt)
else:
index_values = [str(i) for i in self.fmt.tr_frame.index]
row = []
for i in range(nrows):
if truncate_v and i == (self.fmt.tr_row_num):
str_sep_row = ['...' for ele in row]
self.write_tr(str_sep_row, indent, self.indent_delta, tags=None,
nindex_levels=1)
row = []
row.append(index_values[i])
row.extend(fmt_values[j][i] for j in range(ncols))
if truncate_h:
dot_col_ix = self.fmt.tr_col_num + 1
row.insert(dot_col_ix, '...')
self.write_tr(row, indent, self.indent_delta, tags=None,
nindex_levels=1)
def _write_hierarchical_rows(self, fmt_values, indent):
template = 'rowspan="%d" valign="top"'
truncate_h = self.fmt.truncate_h
truncate_v = self.fmt.truncate_v
frame = self.fmt.tr_frame
ncols = len(frame.columns)
nrows = len(frame)
row_levels = self.frame.index.nlevels
idx_values = frame.index.format(sparsify=False, adjoin=False, names=False)
idx_values = compat.lzip(*idx_values)
if self.fmt.sparsify:
# GH3547
sentinel = sentinel_factory()
levels = frame.index.format(sparsify=sentinel, adjoin=False, names=False)
level_lengths = _get_level_lengths(levels, sentinel)
inner_lvl = len(level_lengths) - 1
if truncate_v:
# Insert ... row and adjust idx_values and
# level_lengths to take this into account.
ins_row = self.fmt.tr_row_num
for lnum, records in enumerate(level_lengths):
rec_new = {}
for tag, span in list(records.items()):
if tag >= ins_row:
rec_new[tag + 1] = span
elif tag + span > ins_row:
rec_new[tag] = span + 1
dot_row = list(idx_values[ins_row - 1])
dot_row[-1] = u('...')
idx_values.insert(ins_row, tuple(dot_row))
else:
rec_new[tag] = span
# If ins_row lies between tags, all cols idx cols receive ...
if tag + span == ins_row:
rec_new[ins_row] = 1
if lnum == 0:
idx_values.insert(ins_row, tuple([u('...')]*len(level_lengths)))
level_lengths[lnum] = rec_new
level_lengths[inner_lvl][ins_row] = 1
for ix_col in range(len(fmt_values)):
fmt_values[ix_col].insert(ins_row, '...')
nrows += 1
for i in range(nrows):
row = []
tags = {}
sparse_offset = 0
j = 0
for records, v in zip(level_lengths, idx_values[i]):
if i in records:
if records[i] > 1:
tags[j] = template % records[i]
else:
sparse_offset += 1
continue
j += 1
row.append(v)
row.extend(fmt_values[j][i] for j in range(ncols))
if truncate_h:
row.insert(row_levels - sparse_offset + self.fmt.tr_col_num, '...')
self.write_tr(row, indent, self.indent_delta, tags=tags,
nindex_levels=len(levels) - sparse_offset)
else:
for i in range(len(frame)):
idx_values = list(zip(*frame.index.format(sparsify=False,
adjoin=False,
names=False)))
row = []
row.extend(idx_values[i])
row.extend(fmt_values[j][i] for j in range(ncols))
if truncate_h:
row.insert(row_levels + self.fmt.tr_col_num, '...')
self.write_tr(row, indent, self.indent_delta, tags=None,
nindex_levels=frame.index.nlevels)
# ----------------------------------------------------------------------
# Array formatters
def is_float_dtype(t):
return isinstance(t, Float)
def is_integer_dtype(t):
return isinstance(t, Integer)
def is_datetime_dtype(t):
return isinstance(t, Datetime)
def format_array(values, dtype, formatter, float_format=None, na_rep='NaN',
digits=None, space=None, justify='right'):
if is_float_dtype(dtype):
fmt_klass = FloatArrayFormatter
elif is_integer_dtype(dtype):
fmt_klass = IntArrayFormatter
elif is_datetime_dtype(dtype):
fmt_klass = Datetime64Formatter
else:
fmt_klass = GenericArrayFormatter
if space is None:
space = options.display.column_space
if float_format is None:
float_format = options.display.float_format
if digits is None:
digits = options.display.precision
fmt_obj = fmt_klass(values, digits=digits, na_rep=na_rep,
float_format=float_format,
formatter=formatter, space=space,
justify=justify)
return fmt_obj.get_result()
class GenericArrayFormatter(object):
def __init__(self, values, digits=7, formatter=None, na_rep='NaN',
space=12, float_format=None, justify='right'):
self.values = values
self.digits = digits
self.na_rep = na_rep
self.space = space
self.formatter = formatter
self.float_format = float_format
self.justify = justify
def get_result(self):
fmt_values = [v if v is not None else self.na_rep for v in self._format_strings()]
return _make_fixed_width(fmt_values, self.justify)
def _format_strings(self):
if self.float_format is None:
float_format = options.display.float_format
if float_format is None:
fmt_str = '%% .%dg' % options.display.precision
float_format = lambda x: fmt_str % x
else:
float_format = self.float_format
formatter = self.formatter if self.formatter is not None else \
(lambda x: pprint_thing(x, escape_chars=('\t', '\r', '\n')))
def _format(x):
if self.na_rep is not None and x is None:
if x is None:
return 'None'
return self.na_rep
else:
# object dtype
return '%s' % formatter(x)
vals = self.values
fmt_values = []
for i, v in enumerate(vals):
fmt_values.append(' %s' % _format(v))
return fmt_values
class FloatArrayFormatter(GenericArrayFormatter):
"""
"""
def __init__(self, *args, **kwargs):
GenericArrayFormatter.__init__(self, *args, **kwargs)
if self.float_format is not None and self.formatter is None:
self.formatter = self.float_format
def _format_with(self, fmt_str):
def _val(x, threshold):
if x is not None:
if (threshold is None or
abs(x) > options.display.chop_threshold):
return fmt_str % x
else:
if fmt_str.endswith("e"): # engineering format
return "0"
else:
return fmt_str % 0
else:
return self.na_rep
threshold = options.display.chop_threshold
fmt_values = [_val(x, threshold) for x in self.values]
return _trim_zeros(fmt_values, self.na_rep)
def _format_strings(self):
if self.formatter is not None:
fmt_values = [self.formatter(x) for x in self.values]
else:
fmt_str = '%% .%df' % self.digits
fmt_values = self._format_with(fmt_str)
if len(fmt_values) > 0:
maxlen = max(len(x) for x in fmt_values)
else:
maxlen = 0
too_long = maxlen > self.digits + 6
abs_vals = [abs(val) if val is not None else float('nan') for val in self.values]
# this is pretty arbitrary for now
has_large_values = any(abs_val > 1e8 for abs_val in abs_vals)
has_small_values = any((abs_val < 10 ** (-self.digits)) &
(abs_val > 0) for abs_val in abs_vals)
if too_long and has_large_values:
fmt_str = '%% .%de' % self.digits
fmt_values = self._format_with(fmt_str)
elif has_small_values:
fmt_str = '%% .%de' % self.digits
fmt_values = self._format_with(fmt_str)
return fmt_values
class IntArrayFormatter(GenericArrayFormatter):
def _format_strings(self):
formatter = self.formatter or (lambda x: '% d' % x if x is not None else self.na_rep)
fmt_values = [formatter(x) for x in self.values]
return fmt_values
class Datetime64Formatter(GenericArrayFormatter):
def __init__(self, values, nat_rep='NaT', date_format=None, **kwargs):
super(Datetime64Formatter, self).__init__(values, **kwargs)
self.nat_rep = nat_rep
self.date_format = date_format
def _format_strings(self):
""" we by definition have DO NOT have a TZ """
values = self.values
return [val.strftime('%Y-%m-%d %H:%M:%S') if val is not None else self.nat_rep for val in values]
def _has_names(index):
return hasattr(index, 'name') and index.name is not None
def _put_lines(buf, lines):
if any(isinstance(x, six.text_type) for x in lines):
lines = [six.text_type(x) for x in lines]
buf.write('\n'.join(lines))
def sentinel_factory():
class Sentinel(object):
pass
return Sentinel()
def _get_level_lengths(levels, sentinel=''):
from itertools import groupby
def _make_grouper():
record = {'count': 0}
def grouper(x):
if x != sentinel:
record['count'] += 1
return record['count']
return grouper
result = []
for lev in levels:
i = 0
f = _make_grouper()
recs = {}
for key, gpr in groupby(lev, f):
values = list(gpr)
recs[i] = len(values)
i += len(values)
result.append(recs)
return result
def _trim_zeros(str_floats, na_rep='NaN'):
"""
Trims zeros and decimal points.
"""
trimmed = str_floats
def _cond(values):
non_na = [x for x in values if x != na_rep]
return (len(non_na) > 0 and all([x.endswith('0') for x in non_na]) and
not(any([('e' in x) or ('E' in x) for x in non_na])))
while _cond(trimmed):
trimmed = [x[:-1] if x != na_rep else x for x in trimmed]
# trim decimal points
return [x[:-1] if x.endswith('.') and x != na_rep else x for x in trimmed]
class ExprExecutionGraphFormatter(object):
def __init__(self, dag):
self._dag = dag
@require_package('graphviz')
def _repr_svg_(self):
from graphviz import Source
return Source(self._to_dot())._repr_svg_()
def _format_expr(self, expr):
if is_source_collection(expr):
if isinstance(expr._source_data, Table):
return 'Collection: %s' % expr._source_data.name
else:
return 'Collection: pandas.DataFrame'
elif isinstance(expr, Scalar) and expr._value is not None:
return 'Scalar: %r' % expr._value
else:
node_name = getattr(expr, 'node_name', expr.__class__.__name__)
if isinstance(expr, CollectionExpr):
return '%s[Collection]' % node_name
else:
t = 'Scalar' if isinstance(expr, Scalar) else 'Sequence'
return '{%s[%s]|name: %s|type: %s}' % (
node_name.capitalize(), t, expr.name, expr.dtype)
def _to_str(self):
buffer = six.StringIO()
nodes = self._dag.topological_sort()
for i, node in enumerate(nodes):
sid = i + 1
buffer.write('Stage {0}: \n\n'.format(sid))
buffer.write(repr(node))
if i < len(nodes) - 1:
buffer.write('\n\n')
return to_str(buffer.getvalue())
def _to_html(self):
buffer = six.StringIO()
for i, node in enumerate(self._dag.topological_sort()):
sid = i + 1
buffer.write('<h3>Stage {0}</h3>'.format(sid))
buffer.write(node._repr_html_())
return to_str(buffer.getvalue())
def _to_dot(self):
buffer = six.StringIO()
write = lambda x: buffer.write(to_text(x))
write_newline = lambda x: write(x if x.endswith('\n') else x + '\n')
write_indent_newline = lambda x, ind=1: write_newline(indent(x, 2 * ind))
nid = itertools.count(1)
write_newline('digraph DataFrameDAG {')
write_indent_newline('START [shape=ellipse, label="start", style=filled, fillcolor=Pink];')
nodes = self._dag.topological_sort()
traversed = dict()
for sid, node in izip(itertools.count(1), nodes):
expr_node = node.expr
traversed[id(node)] = sid
pres = self._dag.predecessors(node)
write_indent_newline('subgraph clusterSTAGE{0} {{'.format(sid))
write_indent_newline('label = "Stage {0}"'.format(sid), ind=2)
compiled = str(node._sql()) if hasattr(node, '_sql') else None
for expr in traverse_until_source(expr_node, unique=True):
if id(expr) not in traversed:
eid = next(nid)
traversed[id(expr)] = eid
else:
eid = traversed[id(expr)]
name_args = list(expr.iter_args())
labels = [self._format_expr(expr), ]
for i, name_arg in enumerate(name_args):
if name_arg[1] is None:
continue
labels.append('<f{0}>{1}'.format(i, name_arg[0].strip('_')))
attr = ', style=filled, fillcolor=LightGrey' if isinstance(expr, CollectionExpr) else ''
write_indent_newline(
'EXPR{0} [shape=record, label="{1}"{2}];'.format(eid, '|'.join(labels), attr), ind=2)
no_child = True
for i, name_arg in enumerate(name_args):
name, args = name_arg
if args is None:
continue
def get_arg(arg):
if id(arg) not in traversed:
arg_id = next(nid)
traversed[id(arg)] = arg_id
return 'EXPR{0} -> EXPR{1}:f{2};'.format(traversed[id(arg)], eid, i)
if isinstance(args, Iterable):
for arg in args:
write_indent_newline(get_arg(arg), ind=2)
else:
write_indent_newline(get_arg(args), ind=2)
no_child = False
if no_child:
if len(pres) == 0:
if isinstance(expr, CollectionExpr):
write_indent_newline('START -> EXPR{0};'.format(eid), ind=2)
else:
for pre in pres:
pre_expr = pre.expr
pid = traversed[id(pre_expr)]
if (isinstance(pre_expr, Scalar) and isinstance(expr, Scalar)) or \
(isinstance(pre_expr, CollectionExpr) and isinstance(expr, CollectionExpr)):
write_indent_newline('EXPR{0} -> EXPR{1};'.format(pid, eid), ind=2)
if compiled:
eid = traversed[id(expr_node)]
compiled = '<TABLE ALIGN="LEFT" BORDER="0">%s</TABLE>' % ''.join(
'<TR><TD ALIGN="LEFT">%s</TD></TR>' % cgi.escape(l) for l in compiled.split('\n'))
write_indent_newline(
'COMPILED{0} [shape=record, style="filled", fillcolor="SkyBlue", label=<\n'
.format(eid), ind=2)
write_indent_newline(compiled, ind=3)
write_indent_newline('>];', ind=2)
write_indent_newline(
'EXPR{0} -> COMPILED{0} [arrowhead = none, style = dashed];'.format(eid), ind=2)
write_indent_newline('}')
write('}')
return buffer.getvalue()