in sdks/python/apache_beam/dataframe/frames.py [0:0]
def aggregate(self, func, axis, *args, **kwargs):
# We have specialized implementations for these.
if func in ('quantile',):
return getattr(self, func)(*args, axis=axis, **kwargs)
# In pandas<1.3.0, maps to a property, args are ignored
if func in ('size',) and PD_VERSION < (1, 3):
return getattr(self, func)
# We also have specialized distributed implementations for these. They only
# support axis=0 (implicitly) though. axis=1 should fall through
if func in ('corr', 'cov') and axis in (0, 'index'):
return getattr(self, func)(*args, **kwargs)
if axis is None:
# Aggregate across all elements by first aggregating across columns,
# then across rows.
return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
func, *args, **dict(kwargs, axis=0))
elif axis in (1, 'columns'):
# This is an easy elementwise aggregation.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, axis=1, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Arbitrary()))
elif len(self._expr.proxy().columns) == 0:
# For this corner case, just colocate everything.
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
lambda df: df.agg(func, *args, **kwargs),
[self._expr],
requires_partition_by=partitionings.Singleton()))
else:
# In the general case, we will compute the aggregation of each column
# separately, then recombine.
# First, handle any kwargs that cause a projection, by eagerly generating
# the proxy, and only including the columns that are in the output.
PROJECT_KWARGS = ('numeric_only', 'bool_only', 'include', 'exclude')
proxy = self._expr.proxy().agg(func, axis, *args, **kwargs)
if isinstance(proxy, pd.DataFrame):
projected = self[list(proxy.columns)]
elif isinstance(proxy, pd.Series):
projected = self[list(proxy.index)]
else:
projected = self
nonnumeric_columns = [name for (name, dtype) in projected.dtypes.items()
if not
pd.core.dtypes.common.is_numeric_dtype(dtype)]
if _is_numeric(func) and nonnumeric_columns:
if 'numeric_only' in kwargs and kwargs['numeric_only'] is False:
# User has opted in to execution with non-numeric columns, they
# will accept runtime errors
pass
else:
raise frame_base.WontImplementError(
f"Numeric aggregation ({func!r}) on a DataFrame containing "
f"non-numeric columns ({*nonnumeric_columns,!r} is not "
"supported, unless `numeric_only=` is specified.\n"
"Use `numeric_only=True` to only aggregate over numeric "
"columns.\nUse `numeric_only=False` to aggregate over all "
"columns. Note this is not recommended, as it could result in "
"execution time errors.")
for key in PROJECT_KWARGS:
if key in kwargs:
kwargs.pop(key)
if not isinstance(func, dict):
col_names = list(projected._expr.proxy().columns)
func_by_col = {col: func for col in col_names}
else:
func_by_col = func
col_names = list(func.keys())
aggregated_cols = []
has_lists = any(isinstance(f, list) for f in func_by_col.values())
for col in col_names:
funcs = func_by_col[col]
if has_lists and not isinstance(funcs, list):
# If any of the columns do multiple aggregations, they all must use
# "list" style output
funcs = [funcs]
aggregated_cols.append(projected[col].agg(funcs, *args, **kwargs))
# The final shape is different depending on whether any of the columns
# were aggregated by a list of aggregators.
with expressions.allow_non_parallel_operations():
if isinstance(proxy, pd.Series):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.Series(
{col: value for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton()))
elif isinstance(proxy, pd.DataFrame):
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'join_aggregate',
lambda *cols: pd.DataFrame(
{col: value for col, value in zip(col_names, cols)}),
[col._expr for col in aggregated_cols],
requires_partition_by=partitionings.Singleton()))
else:
raise AssertionError("Unexpected proxy type for "
f"DataFrame.aggregate!: proxy={proxy!r}, "
f"type(proxy)={type(proxy)!r}")