def aggregate()

in sdks/python/apache_beam/dataframe/frames.py [0:0]


  def aggregate(self, func, axis, *args, **kwargs):
    # We have specialized implementations for these.
    if func in ('quantile',):
      return getattr(self, func)(*args, axis=axis, **kwargs)

    # In pandas<1.3.0, maps to a property, args are ignored
    if func in ('size',) and PD_VERSION < (1, 3):
      return getattr(self, func)

    # We also have specialized distributed implementations for these. They only
    # support axis=0 (implicitly) though. axis=1 should fall through
    if func in ('corr', 'cov') and axis in (0, 'index'):
      return getattr(self, func)(*args, **kwargs)

    if axis is None:
      # Aggregate across all elements by first aggregating across columns,
      # then across rows.
      return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
          func, *args, **dict(kwargs, axis=0))
    elif axis in (1, 'columns'):
      # This is an easy elementwise aggregation.
      return frame_base.DeferredFrame.wrap(
          expressions.ComputedExpression(
              'aggregate',
              lambda df: df.agg(func, axis=1, *args, **kwargs),
              [self._expr],
              requires_partition_by=partitionings.Arbitrary()))
    elif len(self._expr.proxy().columns) == 0:
      # For this corner case, just colocate everything.
      return frame_base.DeferredFrame.wrap(
        expressions.ComputedExpression(
            'aggregate',
            lambda df: df.agg(func, *args, **kwargs),
            [self._expr],
            requires_partition_by=partitionings.Singleton()))
    else:
      # In the general case, we will compute the aggregation of each column
      # separately, then recombine.

      # First, handle any kwargs that cause a projection, by eagerly generating
      # the proxy, and only including the columns that are in the output.
      PROJECT_KWARGS = ('numeric_only', 'bool_only', 'include', 'exclude')
      proxy = self._expr.proxy().agg(func, axis, *args, **kwargs)

      if isinstance(proxy, pd.DataFrame):
        projected = self[list(proxy.columns)]
      elif isinstance(proxy, pd.Series):
        projected = self[list(proxy.index)]
      else:
        projected = self

      nonnumeric_columns = [name for (name, dtype) in projected.dtypes.items()
                            if not
                            pd.core.dtypes.common.is_numeric_dtype(dtype)]

      if _is_numeric(func) and nonnumeric_columns:
        if 'numeric_only' in kwargs and kwargs['numeric_only'] is False:
          # User has opted in to execution with non-numeric columns, they
          # will accept runtime errors
          pass
        else:
          raise frame_base.WontImplementError(
              f"Numeric aggregation ({func!r}) on a DataFrame containing "
              f"non-numeric columns ({*nonnumeric_columns,!r} is not "
              "supported, unless `numeric_only=` is specified.\n"
              "Use `numeric_only=True` to only aggregate over numeric "
              "columns.\nUse `numeric_only=False` to aggregate over all "
              "columns. Note this is not recommended, as it could result in "
              "execution time errors.")

      for key in PROJECT_KWARGS:
        if key in kwargs:
          kwargs.pop(key)

      if not isinstance(func, dict):
        col_names = list(projected._expr.proxy().columns)
        func_by_col = {col: func for col in col_names}
      else:
        func_by_col = func
        col_names = list(func.keys())
      aggregated_cols = []
      has_lists = any(isinstance(f, list) for f in func_by_col.values())
      for col in col_names:
        funcs = func_by_col[col]
        if has_lists and not isinstance(funcs, list):
          # If any of the columns do multiple aggregations, they all must use
          # "list" style output
          funcs = [funcs]
        aggregated_cols.append(projected[col].agg(funcs, *args, **kwargs))
      # The final shape is different depending on whether any of the columns
      # were aggregated by a list of aggregators.
      with expressions.allow_non_parallel_operations():
        if isinstance(proxy, pd.Series):
          return frame_base.DeferredFrame.wrap(
            expressions.ComputedExpression(
                'join_aggregate',
                  lambda *cols: pd.Series(
                      {col: value for col, value in zip(col_names, cols)}),
                [col._expr for col in aggregated_cols],
                requires_partition_by=partitionings.Singleton()))
        elif isinstance(proxy, pd.DataFrame):
          return frame_base.DeferredFrame.wrap(
              expressions.ComputedExpression(
                  'join_aggregate',
                  lambda *cols: pd.DataFrame(
                      {col: value for col, value in zip(col_names, cols)}),
                  [col._expr for col in aggregated_cols],
                  requires_partition_by=partitionings.Singleton()))
        else:
          raise AssertionError("Unexpected proxy type for "
                               f"DataFrame.aggregate!: proxy={proxy!r}, "
                               f"type(proxy)={type(proxy)!r}")