core/maxframe/dataframe/extensions/apply_chunk.py [271:472]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    dtypes=None,
    dtype=None,
    name=None,
    output_type=None,
    index=None,
    skip_infer=False,
    args=(),
    **kwargs,
):
    """
    Apply a function that takes pandas DataFrame and outputs pandas DataFrame/Series.
    The pandas DataFrame given to the function is a chunk of the input dataframe, consider as a batch rows.

    The objects passed into this function are slices of the original DataFrame, containing at most batch_rows
    number of rows and all columns. It is equivalent to merging multiple ``df.apply`` with ``axis=1`` inputs and then
    passing them into the function for execution, thereby improving performance in specific scenarios. The function
    output can be either a DataFrame or a Series. ``apply_chunk`` will ultimately merge the results into a new
    DataFrame or Series.

    Don't expect to receive all rows of the DataFrame in the function, as it depends on the implementation
    of MaxFrame and the internal running state of MaxCompute.

    Parameters
    ----------
    func : str or Callable
        Function to apply to the dataframe chunk.

    batch_rows : int
        Specify expected number of rows in a batch, as well as the len of function input dataframe. When the remaining
        data is insufficient, it may be less than this number.

    output_type : {'dataframe', 'series'}, default None
        Specify type of returned object. See `Notes` for more details.

    dtypes : Series, default None
        Specify dtypes of returned DataFrames. See `Notes` for more details.

    dtype : numpy.dtype, default None
        Specify dtype of returned Series. See `Notes` for more details.

    name : str, default None
        Specify name of returned Series. See `Notes` for more details.

    index : Index, default None
        Specify index of returned object. See `Notes` for more details.

    skip_infer: bool, default False
        Whether infer dtypes when dtypes or output_type is not specified.

    args : tuple
        Positional arguments to pass to ``func`` in addition to the
        array/series.

    **kwds
        Additional keyword arguments to pass as keywords arguments to
        ``func``.

    Returns
    -------
    Series or DataFrame
        Result of applying ``func`` along the given chunk of the
        DataFrame.

    See Also
    --------
    DataFrame.apply: For non-batching operations.
    Series.mf.apply_chunk: Apply function to Series chunk.

    Notes
    -----
    When deciding output dtypes and shape of the return value, MaxFrame will
    try applying ``func`` onto a mock DataFrame,  and the apply call may
    fail. When this happens, you need to specify the type of apply call
    (DataFrame or Series) in output_type.

    * For DataFrame output, you need to specify a list or a pandas Series
      as ``dtypes`` of output DataFrame. ``index`` of output can also be
      specified.
    * For Series output, you need to specify ``dtype`` and ``name`` of
      output Series.
    * For any input with data type ``pandas.ArrowDtype(pyarrow.MapType)``, it will always
      be converted to a Python dict. And for any output with this data type, it must be
      returned as a Python dict as well.

    Examples
    --------
    >>> import numpy as np
    >>> import maxframe.tensor as mt
    >>> import maxframe.dataframe as md
    >>> df = md.DataFrame([[4, 9]] * 3, columns=['A', 'B'])
    >>> df.execute()
       A  B
    0  4  9
    1  4  9
    2  4  9

    Use different batch_rows will collect different dataframe chunk into the function.

    For example, when you use ``batch_rows=3``, it means that the function will wait until 3 rows are collected.

    >>> df.mf.apply_chunk(np.sum, batch_rows=3).execute()
    A    12
    B    27
    dtype: int64

    While, if ``batch_rows=2``, the data will be divided into at least two segments. Additionally, if your function
    alters the shape of the dataframe, it may result in different outputs.

    >>> df.mf.apply_chunk(np.sum, batch_rows=2).execute()
    A     8
    B    18
    A     4
    B     9
    dtype: int64

    If the function requires some parameters, you can specify them using args or kwargs.

    >>> def calc(df, x, y):
    ...    return df * x + y
    >>> df.mf.apply_chunk(calc, args=(10,), y=20).execute()
        A    B
    0  60  110
    1  60  110
    2  60  110

    The batch rows will benefit the actions consume a dataframe, like sklearn predict.
    You can easily use sklearn in MaxFrame to perform offline inference, and apply_chunk makes this process more
    efficient. The ``@with_python_requirements`` provides the capability to automatically package and load
    dependencies.

    Once you rely on some third-party dependencies, MaxFrame may not be able to correctly infer the return type.
    Therefore, using ``output_type`` with ``dtype`` or ``dtypes`` is necessary.

    >>> from maxframe.udf import with_python_requirements
    >>> data = {
    ...     'A': np.random.rand(10),
    ...     'B': np.random.rand(10)
    ... }
    >>> pd_df = pd.DataFrame(data)
    >>> X = pd_df[['A']]
    >>> y = pd_df['B']

    >>> from sklearn.model_selection import train_test_split
    >>> from sklearn.linear_model import LinearRegression
    >>> model = LinearRegression()
    >>> model.fit(X, y)

    >>> @with_python_requirements("scikit-learn")
    ... def predict(df):
    ...     predict_B = model.predict(df[["A"]])
    ...     return pd.Series(predict_B, index=df.A.index)

    >>> df.mf.apply_chunk(predict, batch_rows=3, output_type="series", dtype="float", name="predict_B").execute()
    0   -0.765025
    1   -0.765025
    2   -0.765025
    Name: predict_B, dtype: float64

    Create a dataframe with a dict type.

    >>> import pyarrow as pa
    >>> import pandas as pd
    >>> from maxframe.lib.dtypes_extension import dict_
    >>> col_a = pd.Series(
    ...     data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
    ...     index=[1, 2, 3],
    ...     dtype=dict_(pa.string(), pa.int64()),
    ... )
    >>> col_b = pd.Series(
    ...     data=["A", "B", "C"],
    ...     index=[1, 2, 3],
    ... )
    >>> df = md.DataFrame({"A": col_a, "B": col_b})
    >>> df.execute()
                            A  B
    1  [('k1', 1), ('k2', 2)]  A
    2             [('k1', 3)]  B
    3                    <NA>  C

    Define a function that updates the map type with a new key-value pair in a batch.

    >>> def custom_set_item(df):
    ...     for name, value in df["A"].items():
    ...         if value is not None:
    ...             df["A"][name]["x"] = 100
    ...     return df

    >>> mf.apply_chunk(
    ...     process,
    ...     output_type="dataframe",
    ...     dtypes=md_df.dtypes.copy(),
    ...     batch_rows=2,
    ...     skip_infer=True,
    ...     index=md_df.index,
    ... )
                                        A  B
    1  [('k1', 1), ('k2', 2), ('x', 10))]  A
    2              [('k1', 3), ('x', 10)]  B
    3                                <NA>  C
    """
    if not isinstance(func, Callable):
        raise TypeError("function must be a callable object")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



core/maxframe/dataframe/extensions/apply_chunk.py [511:715]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    dtypes=None,
    dtype=None,
    name=None,
    output_type=None,
    index=None,
    skip_infer=False,
    args=(),
    **kwargs,
):
    """
    Apply a function that takes pandas Series and outputs pandas DataFrame/Series.
    The pandas DataFrame given to the function is a chunk of the input series.

    The objects passed into this function are slices of the original series, containing at most batch_rows
    number of elements. The function output can be either a DataFrame or a Series.
    ``apply_chunk`` will ultimately merge the results into a new DataFrame or Series.

    Don't expect to receive all elements of series in the function, as it depends on the implementation
    of MaxFrame and the internal running state of MaxCompute.

    Can be ufunc (a NumPy function that applies to the entire Series)
    or a Python function that only works on series.

    Parameters
    ----------
    func : function
        Python function or NumPy ufunc to apply.

    batch_rows : int
        Specify expected number of elements in a batch, as well as the len of function input series.
        When the remaining data is insufficient, it may be less than this number.

    output_type : {'dataframe', 'series'}, default None
        Specify type of returned object. See `Notes` for more details.

    dtypes : Series, default None
        Specify dtypes of returned DataFrames. See `Notes` for more details.

    dtype : numpy.dtype, default None
        Specify dtype of returned Series. See `Notes` for more details.

    name : str, default None
        Specify name of returned Series. See `Notes` for more details.

    index : Index, default None
        Specify index of returned object. See `Notes` for more details.

    args : tuple
        Positional arguments passed to func after the series value.

    skip_infer: bool, default False
        Whether infer dtypes when dtypes or output_type is not specified.

    **kwds
        Additional keyword arguments passed to func.

    Returns
    -------
    Series or DataFrame
        If func returns a Series object the result will be a Series, else the result will be a DataFrame.

    See Also
    --------
    DataFrame.apply_chunk: Apply function to DataFrame chunk.
    Series.apply: For non-batching operations.

    Notes
    -----
    When deciding output dtypes and shape of the return value, MaxFrame will
    try applying ``func`` onto a mock Series, and the apply call may fail.
    When this happens, you need to specify the type of apply call
    (DataFrame or Series) in output_type.

    * For DataFrame output, you need to specify a list or a pandas Series
      as ``dtypes`` of output DataFrame. ``index`` of output can also be
      specified.
    * For Series output, you need to specify ``dtype`` and ``name`` of
      output Series.
    * For any input with data type ``pandas.ArrowDtype(pyarrow.MapType)``, it will always
      be converted to a Python dict. And for any output with this data type, it must be
      returned as a Python dict as well.

    Examples
    --------
    Create a series with typical summer temperatures for each city.

    >>> import maxframe.tensor as mt
    >>> import maxframe.dataframe as md
    >>> s = md.Series([20, 21, 12],
    ...               index=['London', 'New York', 'Helsinki'])
    >>> s.execute()
    London      20
    New York    21
    Helsinki    12
    dtype: int64

    Square the values by defining a function and passing it as an
    argument to ``apply_chunk()``.

    >>> def square(x):
    ...     return x ** 2
    >>> s.mf.apply_chunk(square, batch_rows=2).execute()
    London      400
    New York    441
    Helsinki    144
    dtype: int64

    Square the values by passing an anonymous function as an
    argument to ``apply_chunk()``.

    >>> s.mf.apply_chunk(lambda x: x**2, batch_rows=2).execute()
    London      400
    New York    441
    Helsinki    144
    dtype: int64

    Define a custom function that needs additional positional
    arguments and pass these additional arguments using the
    ``args`` keyword.

    >>> def subtract_custom_value(x, custom_value):
    ...     return x - custom_value

    >>> s.mf.apply_chunk(subtract_custom_value, args=(5,), batch_rows=3).execute()
    London      15
    New York    16
    Helsinki     7
    dtype: int64

    Define a custom function that takes keyword arguments
    and pass these arguments to ``apply_chunk``.

    >>> def add_custom_values(x, **kwargs):
    ...     for month in kwargs:
    ...         x += kwargs[month]
    ...     return x

    >>> s.mf.apply_chunk(add_custom_values, batch_rows=2, june=30, july=20, august=25).execute()
    London      95
    New York    96
    Helsinki    87
    dtype: int64

    If func return a dataframe, the apply_chunk will return a dataframe as well.

    >>> def get_dataframe(x):
    ...     return pd.concat([x, x], axis=1)

    >>> s.mf.apply_chunk(get_dataframe, batch_rows=2).execute()
               0   1
    London    20  20
    New York  21  21
    Helsinki  12  12

    Provides a dtypes or dtype with name to naming the output schema.

    >>> s.mf.apply_chunk(
    ...    get_dataframe,
    ...    batch_rows=2,
    ...    dtypes={"A": np.int_, "B": np.int_},
    ...    output_type="dataframe"
    ... ).execute()
               A   B
    London    20  20
    New York  21  21
    Helsinki  12  12

    Create a series with a dict type.

    >>> import pyarrow as pa
    >>> from maxframe.lib.dtypes_extension import dict_
    >>> s = md.Series(
    ...     data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
    ...     index=[1, 2, 3],
    ...     dtype=dict_(pa.string(), pa.int64()),
    ... )
    >>> s.execute()
    1    [('k1', 1), ('k2', 2)]
    2               [('k1', 3)]
    3                      <NA>
    dtype: map<string, int64>[pyarrow]

    Define a function that updates the map type with a new key-value pair in a batch.

    >>> def custom_set_item(row):
    ...     for _, value in row.items():
    ...         if value is not None:
    ...             value["x"] = 100
    ...     return row

    >>> s.mf.apply_chunk(
    ...     custom_set_item,
    ...     output_type="series",
    ...     dtype=s.dtype,
    ...     batch_rows=2,
    ...     skip_infer=True,
    ...     index=s.index,
    ... ).execute()
    1    [('k1', 1), ('k2', 2), ('x', 100)]
    2               [('k1', 3), ('x', 100)]
    3                                  <NA>
    dtype: map<string, int64>[pyarrow]
    """
    if not isinstance(func, Callable):
        raise TypeError("function must be a callable object")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



