#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.connect.utils import check_dependencies

check_dependencies(__name__)

import decimal
import inspect
import warnings
import functools
from typing import (
    Any,
    Mapping,
    TYPE_CHECKING,
    Union,
    Sequence,
    List,
    overload,
    Optional,
    Tuple,
    Type,
    Callable,
    ValuesView,
    cast,
)
import random as py_random
import sys

import numpy as np

from pyspark.errors import PySparkTypeError, PySparkValueError
from pyspark.errors.utils import _with_origin
from pyspark.sql import Column
from pyspark.sql.connect.expressions import (
    CaseWhen,
    SortOrder,
    Expression,
    LiteralExpression,
    ColumnReference,
    UnresolvedFunction,
    UnresolvedStar,
    SQLExpression,
    LambdaFunction,
    UnresolvedNamedLambdaVariable,
    CallFunction,
)
from pyspark.sql.connect.udf import _create_py_udf
from pyspark.sql.connect.udtf import AnalyzeArgument, AnalyzeResult  # noqa: F401
from pyspark.sql.connect.udtf import _create_py_udtf
from pyspark.sql import functions as pysparkfuncs
from pyspark.sql.types import (
    _from_numpy_type,
    DataType,
    StructType,
    ArrayType,
    StringType,
)
from pyspark.sql.utils import enum_to_value as _enum_to_value

# The implementation of pandas_udf is embedded in pyspark.sql.function.pandas_udf
# for code reuse.
from pyspark.sql.functions import pandas_udf  # noqa: F401


if TYPE_CHECKING:
    from pyspark.sql.connect._typing import (
        ColumnOrName,
        DataTypeOrString,
        UserDefinedFunctionLike,
    )
    from pyspark.sql.dataframe import DataFrame
    from pyspark.sql.connect.udtf import UserDefinedTableFunction


def _to_col(col: "ColumnOrName") -> Column:
    assert isinstance(col, (Column, str))
    return col if isinstance(col, Column) else column(col)


def _sort_col(col: "ColumnOrName") -> Column:
    assert isinstance(col, (Column, str))
    if isinstance(col, Column):
        if isinstance(col._expr, SortOrder):
            return col
        else:
            return col.asc()
    else:
        return column(col).asc()


def _invoke_function(name: str, *args: Union[Column, Expression]) -> Column:
    """
    Simple wrapper function that converts the arguments into the appropriate types.
    Parameters
    ----------
    name Name of the function to be called.
        args The list of arguments.

    Returns
    -------
    :class:`Column`
    """
    from pyspark.sql.connect.column import Column as ConnectColumn

    expressions: List[Expression] = []
    for arg in args:
        assert isinstance(arg, (Column, Expression))
        if isinstance(arg, Column):
            expressions.append(arg._expr)  # type: ignore[arg-type]
        else:
            expressions.append(arg)
    return ConnectColumn(UnresolvedFunction(name, expressions))


def _invoke_function_over_columns(name: str, *cols: "ColumnOrName") -> Column:
    """
    Invokes n-ary function identified by name
    and wraps the result with :class:`~pyspark.sql.Column`.
    """
    _cols = [_to_col(c) for c in cols]
    return _invoke_function(name, *_cols)


def _invoke_binary_math_function(name: str, col1: Any, col2: Any) -> Column:
    """
    Invokes binary math function identified by name
    and wraps the result with :class:`~pyspark.sql.Column`.
    """

    # For legacy reasons, the arguments here can be implicitly converted into column
    _cols = [_to_col(c) if isinstance(c, (str, Column)) else lit(c) for c in (col1, col2)]
    return _invoke_function(name, *_cols)


def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
    signature = inspect.signature(f)
    parameters = signature.parameters.values()

    # We should exclude functions that use, variable args and keyword argument
    # names, as well as keyword only args.
    supported_parameter_types = {
        inspect.Parameter.POSITIONAL_OR_KEYWORD,
        inspect.Parameter.POSITIONAL_ONLY,
    }

    # Validate that the function arity is between 1 and 3.
    if not (1 <= len(parameters) <= 3):
        raise PySparkValueError(
            errorClass="WRONG_NUM_ARGS_FOR_HIGHER_ORDER_FUNCTION",
            messageParameters={"func_name": f.__name__, "num_args": str(len(parameters))},
        )

    # Verify that all arguments can be used as positional arguments.
    if not all(p.kind in supported_parameter_types for p in parameters):
        raise PySparkValueError(
            errorClass="UNSUPPORTED_PARAM_TYPE_FOR_HIGHER_ORDER_FUNCTION",
            messageParameters={"func_name": f.__name__},
        )

    return parameters


def _create_lambda(f: Callable) -> LambdaFunction:
    """
    Create `o.a.s.sql.expressions.LambdaFunction` corresponding
    to transformation described by f

    :param f: A Python of one of the following forms:
            - (Column) -> Column: ...
            - (Column, Column) -> Column: ...
            - (Column, Column, Column) -> Column: ...
    """
    from pyspark.sql.connect.column import Column as ConnectColumn

    parameters = _get_lambda_parameters(f)

    arg_names = ["x", "y", "z"][: len(parameters)]
    arg_exprs = [
        UnresolvedNamedLambdaVariable([UnresolvedNamedLambdaVariable.fresh_var_name(arg_name)])
        for arg_name in arg_names
    ]
    arg_cols = [ConnectColumn(arg_expr) for arg_expr in arg_exprs]

    result = f(*arg_cols)

    if not isinstance(result, Column):
        raise PySparkValueError(
            errorClass="HIGHER_ORDER_FUNCTION_SHOULD_RETURN_COLUMN",
            messageParameters={"func_name": f.__name__, "return_type": type(result).__name__},
        )

    return LambdaFunction(result._expr, arg_exprs)  # type: ignore[arg-type]


def _invoke_higher_order_function(
    name: str,
    cols: Sequence["ColumnOrName"],
    funs: Sequence[Callable],
) -> Column:
    """
    Invokes expression identified by name,
    (relative to ```org.apache.spark.sql.catalyst.expressions``)
    and wraps the result with Column (first Scala one, then Python).

    :param name: Name of the expression
    :param cols: a list of columns
    :param funs: a list of (*Column) -> Column functions.

    :return: a Column
    """
    _cols = [_to_col(c) for c in cols]
    _funs = [_create_lambda(f) for f in funs]

    return _invoke_function(name, *_cols, *_funs)


def _options_to_col(options: Mapping[str, Any]) -> Column:
    _options: List[Column] = []
    for k, v in options.items():
        _options.append(lit(str(k)))
        _options.append(lit(str(v)))
    return create_map(*_options)


# Normal Functions


@_with_origin
def col(col: str) -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    if col == "*":
        return ConnectColumn(UnresolvedStar(unparsed_target=None))
    elif col.endswith(".*"):
        return ConnectColumn(UnresolvedStar(unparsed_target=col))
    else:
        return ConnectColumn(ColumnReference(unparsed_identifier=col))


col.__doc__ = pysparkfuncs.col.__doc__


column = col


def lit(col: Any) -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    if isinstance(col, Column):
        return col
    elif isinstance(col, list):
        if any(isinstance(c, Column) for c in col):
            raise PySparkValueError(
                errorClass="COLUMN_IN_LIST", messageParameters={"func_name": "lit"}
            )
        return array(*[lit(c) for c in col])
    elif isinstance(col, np.ndarray) and col.ndim == 1:
        dt = _from_numpy_type(col.dtype)
        if dt is None:
            raise PySparkTypeError(
                errorClass="UNSUPPORTED_NUMPY_ARRAY_SCALAR",
                messageParameters={"dtype": col.dtype.name},
            )
        return array(*[lit(c) for c in col]).cast(ArrayType(dt))
    return ConnectColumn(LiteralExpression._from_value(col))


lit.__doc__ = pysparkfuncs.lit.__doc__


def bitwiseNOT(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 3.4, use bitwise_not instead.", FutureWarning)
    return bitwise_not(col)


bitwiseNOT.__doc__ = pysparkfuncs.bitwiseNOT.__doc__


def bitwise_not(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("~", col)


bitwise_not.__doc__ = pysparkfuncs.bitwise_not.__doc__


def bit_count(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bit_count", col)


bit_count.__doc__ = pysparkfuncs.bit_count.__doc__


def bit_get(col: "ColumnOrName", pos: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bit_get", col, pos)


bit_get.__doc__ = pysparkfuncs.bit_get.__doc__


def getbit(col: "ColumnOrName", pos: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("getbit", col, pos)


getbit.__doc__ = pysparkfuncs.getbit.__doc__


def broadcast(df: "DataFrame") -> "DataFrame":
    from pyspark.sql.connect.dataframe import DataFrame

    if not isinstance(df, DataFrame):
        raise PySparkTypeError(
            errorClass="NOT_DATAFRAME",
            messageParameters={"arg_name": "df", "arg_type": type(df).__name__},
        )
    return df.hint("broadcast")


broadcast.__doc__ = pysparkfuncs.broadcast.__doc__


def coalesce(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("coalesce", *cols)


coalesce.__doc__ = pysparkfuncs.coalesce.__doc__


def expr(str: str) -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    return ConnectColumn(SQLExpression(str))


expr.__doc__ = pysparkfuncs.expr.__doc__


def greatest(*cols: "ColumnOrName") -> Column:
    if len(cols) < 2:
        raise PySparkValueError(
            errorClass="WRONG_NUM_COLUMNS",
            messageParameters={"func_name": "greatest", "num_cols": "2"},
        )
    return _invoke_function_over_columns("greatest", *cols)


greatest.__doc__ = pysparkfuncs.greatest.__doc__


def input_file_name() -> Column:
    return _invoke_function("input_file_name")


input_file_name.__doc__ = pysparkfuncs.input_file_name.__doc__


def least(*cols: "ColumnOrName") -> Column:
    if len(cols) < 2:
        raise PySparkValueError(
            errorClass="WRONG_NUM_COLUMNS",
            messageParameters={"func_name": "least", "num_cols": "2"},
        )
    return _invoke_function_over_columns("least", *cols)


least.__doc__ = pysparkfuncs.least.__doc__


def isnan(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("isnan", col)


isnan.__doc__ = pysparkfuncs.isnan.__doc__


def isnull(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("isnull", col)


isnull.__doc__ = pysparkfuncs.isnull.__doc__


def monotonically_increasing_id() -> Column:
    return _invoke_function("monotonically_increasing_id")


monotonically_increasing_id.__doc__ = pysparkfuncs.monotonically_increasing_id.__doc__


def nanvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("nanvl", col1, col2)


nanvl.__doc__ = pysparkfuncs.nanvl.__doc__


def rand(seed: Optional[int] = None) -> Column:
    if seed is not None:
        return _invoke_function("rand", lit(seed))
    else:
        return _invoke_function("rand", lit(py_random.randint(0, sys.maxsize)))


rand.__doc__ = pysparkfuncs.rand.__doc__


random = rand


def randn(seed: Optional[int] = None) -> Column:
    if seed is not None:
        return _invoke_function("randn", lit(seed))
    else:
        return _invoke_function("randn", lit(py_random.randint(0, sys.maxsize)))


randn.__doc__ = pysparkfuncs.randn.__doc__


def spark_partition_id() -> Column:
    return _invoke_function("spark_partition_id")


spark_partition_id.__doc__ = pysparkfuncs.spark_partition_id.__doc__


def when(condition: Column, value: Any) -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    # Explicitly not using ColumnOrName type here to make reading condition less opaque
    if not isinstance(condition, Column):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN",
            messageParameters={"arg_name": "condition", "arg_type": type(condition).__name__},
        )

    value = _enum_to_value(value)
    value_col = value if isinstance(value, Column) else lit(value)

    return ConnectColumn(
        CaseWhen(
            branches=[(condition._expr, value_col._expr)],  # type: ignore[list-item]
            else_value=None,
        )
    )


when.__doc__ = pysparkfuncs.when.__doc__


# Sort Functions


def asc(col: "ColumnOrName") -> Column:
    return _to_col(col).asc()


asc.__doc__ = pysparkfuncs.asc.__doc__


def asc_nulls_first(col: "ColumnOrName") -> Column:
    return _to_col(col).asc_nulls_first()


asc_nulls_first.__doc__ = pysparkfuncs.asc_nulls_first.__doc__


def asc_nulls_last(col: "ColumnOrName") -> Column:
    return _to_col(col).asc_nulls_last()


asc_nulls_last.__doc__ = pysparkfuncs.asc_nulls_last.__doc__


def desc(col: "ColumnOrName") -> Column:
    return _to_col(col).desc()


desc.__doc__ = pysparkfuncs.desc.__doc__


def desc_nulls_first(col: "ColumnOrName") -> Column:
    return _to_col(col).desc_nulls_first()


desc_nulls_first.__doc__ = pysparkfuncs.desc_nulls_first.__doc__


def desc_nulls_last(col: "ColumnOrName") -> Column:
    return _to_col(col).desc_nulls_last()


desc_nulls_last.__doc__ = pysparkfuncs.desc_nulls_last.__doc__


# Math Functions


def abs(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("abs", col)


abs.__doc__ = pysparkfuncs.abs.__doc__


def acos(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("acos", col)


acos.__doc__ = pysparkfuncs.acos.__doc__


def acosh(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("acosh", col)


acosh.__doc__ = pysparkfuncs.acosh.__doc__


def asin(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("asin", col)


asin.__doc__ = pysparkfuncs.asin.__doc__


def asinh(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("asinh", col)


asinh.__doc__ = pysparkfuncs.asinh.__doc__


def atan(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("atan", col)


atan.__doc__ = pysparkfuncs.atan.__doc__


def atan2(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) -> Column:
    return _invoke_binary_math_function("atan2", col1, col2)


atan2.__doc__ = pysparkfuncs.atan2.__doc__


def atanh(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("atanh", col)


atanh.__doc__ = pysparkfuncs.atanh.__doc__


def bin(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bin", col)


bin.__doc__ = pysparkfuncs.bin.__doc__


def bround(col: "ColumnOrName", scale: Optional[Union[Column, int]] = None) -> Column:
    if scale is None:
        return _invoke_function_over_columns("bround", col)
    else:
        scale = _enum_to_value(scale)
        scale = lit(scale) if isinstance(scale, int) else scale
        return _invoke_function_over_columns("bround", col, scale)  # type: ignore[arg-type]


bround.__doc__ = pysparkfuncs.bround.__doc__


def cbrt(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("cbrt", col)


cbrt.__doc__ = pysparkfuncs.cbrt.__doc__


def ceil(col: "ColumnOrName", scale: Optional[Union[Column, int]] = None) -> Column:
    if scale is None:
        return _invoke_function_over_columns("ceil", col)
    else:
        scale = _enum_to_value(scale)
        scale = lit(scale) if isinstance(scale, int) else scale
        return _invoke_function_over_columns("ceil", col, scale)  # type: ignore[arg-type]


ceil.__doc__ = pysparkfuncs.ceil.__doc__


def ceiling(col: "ColumnOrName", scale: Optional[Union[Column, int]] = None) -> Column:
    if scale is None:
        return _invoke_function_over_columns("ceiling", col)
    else:
        scale = _enum_to_value(scale)
        scale = lit(scale) if isinstance(scale, int) else scale
        return _invoke_function_over_columns("ceiling", col, scale)  # type: ignore[arg-type]


ceiling.__doc__ = pysparkfuncs.ceiling.__doc__


def conv(col: "ColumnOrName", fromBase: int, toBase: int) -> Column:
    return _invoke_function("conv", _to_col(col), lit(fromBase), lit(toBase))


conv.__doc__ = pysparkfuncs.conv.__doc__


def cos(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("cos", col)


cos.__doc__ = pysparkfuncs.cos.__doc__


def cosh(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("cosh", col)


cosh.__doc__ = pysparkfuncs.cosh.__doc__


def cot(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("cot", col)


cot.__doc__ = pysparkfuncs.cot.__doc__


def csc(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("csc", col)


csc.__doc__ = pysparkfuncs.csc.__doc__


def degrees(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("degrees", col)


degrees.__doc__ = pysparkfuncs.degrees.__doc__


def e() -> Column:
    return _invoke_function("e")


e.__doc__ = pysparkfuncs.e.__doc__


def exp(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("exp", col)


exp.__doc__ = pysparkfuncs.exp.__doc__


def expm1(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("expm1", col)


expm1.__doc__ = pysparkfuncs.expm1.__doc__


def factorial(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("factorial", col)


factorial.__doc__ = pysparkfuncs.factorial.__doc__


def floor(col: "ColumnOrName", scale: Optional[Union[Column, int]] = None) -> Column:
    if scale is None:
        return _invoke_function_over_columns("floor", col)
    else:
        scale = _enum_to_value(scale)
        scale = lit(scale) if isinstance(scale, int) else scale
        return _invoke_function_over_columns("floor", col, scale)  # type: ignore[arg-type]


floor.__doc__ = pysparkfuncs.floor.__doc__


def hex(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("hex", col)


hex.__doc__ = pysparkfuncs.hex.__doc__


def hypot(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) -> Column:
    return _invoke_binary_math_function("hypot", col1, col2)


hypot.__doc__ = pysparkfuncs.hypot.__doc__


def log(arg1: Union["ColumnOrName", float], arg2: Optional["ColumnOrName"] = None) -> Column:
    if arg2 is None:
        # in this case, arg1 should be "ColumnOrName"
        return _invoke_function("ln", _to_col(cast("ColumnOrName", arg1)))
    else:
        # in this case, arg1 should be a float
        return _invoke_function("log", lit(cast(float, arg1)), _to_col(arg2))


log.__doc__ = pysparkfuncs.log.__doc__


def log10(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("log10", col)


log10.__doc__ = pysparkfuncs.log10.__doc__


def log1p(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("log1p", col)


log1p.__doc__ = pysparkfuncs.log1p.__doc__


def ln(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("ln", col)


ln.__doc__ = pysparkfuncs.ln.__doc__


def log2(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("log2", col)


log2.__doc__ = pysparkfuncs.log2.__doc__


def negative(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("negative", col)


negative.__doc__ = pysparkfuncs.negative.__doc__


negate = negative


def pi() -> Column:
    return _invoke_function("pi")


pi.__doc__ = pysparkfuncs.pi.__doc__


def positive(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("positive", col)


positive.__doc__ = pysparkfuncs.positive.__doc__


def pmod(dividend: Union["ColumnOrName", float], divisor: Union["ColumnOrName", float]) -> Column:
    return _invoke_binary_math_function("pmod", dividend, divisor)


pmod.__doc__ = pysparkfuncs.pmod.__doc__


def width_bucket(
    v: "ColumnOrName",
    min: "ColumnOrName",
    max: "ColumnOrName",
    numBucket: Union["ColumnOrName", int],
) -> Column:
    numBucket = _enum_to_value(numBucket)
    numBucket = lit(numBucket) if isinstance(numBucket, int) else numBucket
    return _invoke_function_over_columns("width_bucket", v, min, max, numBucket)


width_bucket.__doc__ = pysparkfuncs.width_bucket.__doc__


def pow(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) -> Column:
    return _invoke_binary_math_function("power", col1, col2)


pow.__doc__ = pysparkfuncs.pow.__doc__


power = pow


def radians(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("radians", col)


radians.__doc__ = pysparkfuncs.radians.__doc__


def rint(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("rint", col)


rint.__doc__ = pysparkfuncs.rint.__doc__


def round(col: "ColumnOrName", scale: Optional[Union[Column, int]] = None) -> Column:
    if scale is None:
        return _invoke_function_over_columns("round", col)
    else:
        scale = _enum_to_value(scale)
        scale = lit(scale) if isinstance(scale, int) else scale
        return _invoke_function_over_columns("round", col, scale)  # type: ignore[arg-type]


round.__doc__ = pysparkfuncs.round.__doc__


def sec(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sec", col)


sec.__doc__ = pysparkfuncs.sec.__doc__


def shiftLeft(col: "ColumnOrName", numBits: int) -> Column:
    warnings.warn("Deprecated in 3.4, use shiftleft instead.", FutureWarning)
    return shiftleft(col, numBits)


shiftLeft.__doc__ = pysparkfuncs.shiftLeft.__doc__


def shiftleft(col: "ColumnOrName", numBits: int) -> Column:
    return _invoke_function("shiftleft", _to_col(col), lit(numBits))


shiftleft.__doc__ = pysparkfuncs.shiftleft.__doc__


def shiftRight(col: "ColumnOrName", numBits: int) -> Column:
    warnings.warn("Deprecated in 3.4, use shiftright instead.", FutureWarning)
    return shiftright(col, numBits)


shiftRight.__doc__ = pysparkfuncs.shiftRight.__doc__


def shiftright(col: "ColumnOrName", numBits: int) -> Column:
    return _invoke_function("shiftright", _to_col(col), lit(numBits))


shiftright.__doc__ = pysparkfuncs.shiftright.__doc__


def shiftRightUnsigned(col: "ColumnOrName", numBits: int) -> Column:
    warnings.warn("Deprecated in 3.4, use shiftrightunsigned instead.", FutureWarning)
    return shiftrightunsigned(col, numBits)


shiftRightUnsigned.__doc__ = pysparkfuncs.shiftRightUnsigned.__doc__


def shiftrightunsigned(col: "ColumnOrName", numBits: int) -> Column:
    return _invoke_function("shiftrightunsigned", _to_col(col), lit(numBits))


shiftrightunsigned.__doc__ = pysparkfuncs.shiftrightunsigned.__doc__


def signum(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("signum", col)


signum.__doc__ = pysparkfuncs.signum.__doc__


def sign(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sign", col)


sign.__doc__ = pysparkfuncs.sign.__doc__


def sin(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sin", col)


sin.__doc__ = pysparkfuncs.sin.__doc__


def sinh(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sinh", col)


sinh.__doc__ = pysparkfuncs.sinh.__doc__


def sqrt(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sqrt", col)


sqrt.__doc__ = pysparkfuncs.sqrt.__doc__


def try_add(left: "ColumnOrName", right: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_add", left, right)


try_add.__doc__ = pysparkfuncs.try_add.__doc__


def try_avg(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_avg", col)


try_avg.__doc__ = pysparkfuncs.try_avg.__doc__


def try_divide(left: "ColumnOrName", right: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_divide", left, right)


try_divide.__doc__ = pysparkfuncs.try_divide.__doc__


def try_mod(left: "ColumnOrName", right: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_mod", left, right)


try_mod.__doc__ = pysparkfuncs.try_mod.__doc__


def try_multiply(left: "ColumnOrName", right: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_multiply", left, right)


try_multiply.__doc__ = pysparkfuncs.try_multiply.__doc__


def try_subtract(left: "ColumnOrName", right: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_subtract", left, right)


try_subtract.__doc__ = pysparkfuncs.try_subtract.__doc__


def try_sum(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_sum", col)


try_sum.__doc__ = pysparkfuncs.try_sum.__doc__


def tan(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("tan", col)


tan.__doc__ = pysparkfuncs.tan.__doc__


def tanh(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("tanh", col)


tanh.__doc__ = pysparkfuncs.tanh.__doc__


def toDegrees(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 3.4, use degrees instead.", FutureWarning)
    return degrees(col)


toDegrees.__doc__ = pysparkfuncs.toDegrees.__doc__


def toRadians(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 3.4, use radians instead.", FutureWarning)
    return radians(col)


toRadians.__doc__ = pysparkfuncs.toRadians.__doc__


def unhex(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("unhex", col)


unhex.__doc__ = pysparkfuncs.unhex.__doc__


def uniform(
    min: Union[Column, int, float],
    max: Union[Column, int, float],
    seed: Optional[Union[Column, int]] = None,
) -> Column:
    if seed is None:
        return _invoke_function_over_columns(
            "uniform", lit(min), lit(max), lit(py_random.randint(0, sys.maxsize))
        )
    else:
        return _invoke_function_over_columns("uniform", lit(min), lit(max), lit(seed))


uniform.__doc__ = pysparkfuncs.uniform.__doc__


def approxCountDistinct(col: "ColumnOrName", rsd: Optional[float] = None) -> Column:
    warnings.warn("Deprecated in 3.4, use approx_count_distinct instead.", FutureWarning)
    return approx_count_distinct(col, rsd)


approxCountDistinct.__doc__ = pysparkfuncs.approxCountDistinct.__doc__


def approx_count_distinct(col: "ColumnOrName", rsd: Optional[float] = None) -> Column:
    if rsd is None:
        return _invoke_function("approx_count_distinct", _to_col(col))
    else:
        return _invoke_function("approx_count_distinct", _to_col(col), lit(rsd))


approx_count_distinct.__doc__ = pysparkfuncs.approx_count_distinct.__doc__


def avg(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("avg", col)


avg.__doc__ = pysparkfuncs.avg.__doc__


def collect_list(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("collect_list", col)


collect_list.__doc__ = pysparkfuncs.collect_list.__doc__


def array_agg(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_agg", col)


array_agg.__doc__ = pysparkfuncs.array_agg.__doc__


def collect_set(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("collect_set", col)


collect_set.__doc__ = pysparkfuncs.collect_set.__doc__


def listagg(col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None) -> Column:
    if delimiter is None:
        return _invoke_function_over_columns("listagg", col)
    else:
        return _invoke_function_over_columns("listagg", col, lit(delimiter))


listagg.__doc__ = pysparkfuncs.listagg.__doc__


def listagg_distinct(
    col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None
) -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    args = [col]
    if delimiter is not None:
        args += [lit(delimiter)]

    _exprs = [_to_col(c)._expr for c in args]
    return ConnectColumn(
        UnresolvedFunction("listagg", _exprs, is_distinct=True)  # type: ignore[arg-type]
    )


listagg_distinct.__doc__ = pysparkfuncs.listagg_distinct.__doc__


def string_agg(
    col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None
) -> Column:
    if delimiter is None:
        return _invoke_function_over_columns("string_agg", col)
    else:
        return _invoke_function_over_columns("string_agg", col, lit(delimiter))


string_agg.__doc__ = pysparkfuncs.string_agg.__doc__


def string_agg_distinct(
    col: "ColumnOrName", delimiter: Optional[Union[Column, str, bytes]] = None
) -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    args = [col]
    if delimiter is not None:
        args += [lit(delimiter)]

    _exprs = [_to_col(c)._expr for c in args]
    return ConnectColumn(
        UnresolvedFunction("string_agg", _exprs, is_distinct=True)  # type: ignore[arg-type]
    )


string_agg_distinct.__doc__ = pysparkfuncs.string_agg_distinct.__doc__


def corr(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("corr", col1, col2)


corr.__doc__ = pysparkfuncs.corr.__doc__


def count(col: "ColumnOrName") -> Column:
    if isinstance(col, Column) and isinstance(col._expr, UnresolvedStar):
        col = lit(1)
    return _invoke_function_over_columns("count", col)


count.__doc__ = pysparkfuncs.count.__doc__


def countDistinct(col: "ColumnOrName", *cols: "ColumnOrName") -> Column:
    return count_distinct(col, *cols)


countDistinct.__doc__ = pysparkfuncs.countDistinct.__doc__


def count_distinct(col: "ColumnOrName", *cols: "ColumnOrName") -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    _exprs = [_to_col(c)._expr for c in [col] + list(cols)]
    return ConnectColumn(
        UnresolvedFunction("count", _exprs, is_distinct=True)  # type: ignore[arg-type]
    )


count_distinct.__doc__ = pysparkfuncs.count_distinct.__doc__


def covar_pop(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("covar_pop", col1, col2)


covar_pop.__doc__ = pysparkfuncs.covar_pop.__doc__


def covar_samp(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("covar_samp", col1, col2)


covar_samp.__doc__ = pysparkfuncs.covar_samp.__doc__


def first(col: "ColumnOrName", ignorenulls: bool = False) -> Column:
    return _invoke_function("first", _to_col(col), lit(ignorenulls))


first.__doc__ = pysparkfuncs.first.__doc__


def grouping(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("grouping", col)


grouping.__doc__ = pysparkfuncs.grouping.__doc__


def grouping_id(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("grouping_id", *cols)


grouping_id.__doc__ = pysparkfuncs.grouping_id.__doc__


def count_min_sketch(
    col: "ColumnOrName",
    eps: Union[Column, float],
    confidence: Union[Column, float],
    seed: Optional[Union[Column, int]] = None,
) -> Column:
    _seed = lit(py_random.randint(0, sys.maxsize)) if seed is None else lit(seed)
    return _invoke_function_over_columns("count_min_sketch", col, lit(eps), lit(confidence), _seed)


count_min_sketch.__doc__ = pysparkfuncs.count_min_sketch.__doc__


def kurtosis(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("kurtosis", col)


kurtosis.__doc__ = pysparkfuncs.kurtosis.__doc__


def last(col: "ColumnOrName", ignorenulls: bool = False) -> Column:
    return _invoke_function("last", _to_col(col), lit(ignorenulls))


last.__doc__ = pysparkfuncs.last.__doc__


def max(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("max", col)


max.__doc__ = pysparkfuncs.max.__doc__


def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("max_by", col, ord)


max_by.__doc__ = pysparkfuncs.max_by.__doc__


def mean(col: "ColumnOrName") -> Column:
    return avg(col)


mean.__doc__ = pysparkfuncs.mean.__doc__


def median(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("median", col)


median.__doc__ = pysparkfuncs.median.__doc__


def min(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("min", col)


min.__doc__ = pysparkfuncs.min.__doc__


def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("min_by", col, ord)


min_by.__doc__ = pysparkfuncs.min_by.__doc__


def mode(col: "ColumnOrName", deterministic: bool = False) -> Column:
    return _invoke_function("mode", _to_col(col), lit(deterministic))


mode.__doc__ = pysparkfuncs.mode.__doc__


def percentile(
    col: "ColumnOrName",
    percentage: Union[Column, float, Sequence[float], Tuple[float, ...]],
    frequency: Union[Column, int] = 1,
) -> Column:
    if not isinstance(frequency, (int, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_INT",
            messageParameters={
                "arg_name": "frequency",
                "arg_type": type(frequency).__name__,
            },
        )

    percentage = lit(list(percentage)) if isinstance(percentage, (list, tuple)) else lit(percentage)
    return _invoke_function_over_columns("percentile", col, percentage, lit(frequency))


percentile.__doc__ = pysparkfuncs.percentile.__doc__


def percentile_approx(
    col: "ColumnOrName",
    percentage: Union[Column, float, Sequence[float], Tuple[float, ...]],
    accuracy: Union[Column, int] = 10000,
) -> Column:
    percentage = lit(list(percentage)) if isinstance(percentage, (list, tuple)) else lit(percentage)
    return _invoke_function_over_columns("percentile_approx", col, percentage, lit(accuracy))


percentile_approx.__doc__ = pysparkfuncs.percentile_approx.__doc__


def approx_percentile(
    col: "ColumnOrName",
    percentage: Union[Column, float, Sequence[float], Tuple[float, ...]],
    accuracy: Union[Column, int] = 10000,
) -> Column:
    percentage = lit(list(percentage)) if isinstance(percentage, (list, tuple)) else lit(percentage)
    return _invoke_function_over_columns("approx_percentile", col, percentage, lit(accuracy))


approx_percentile.__doc__ = pysparkfuncs.approx_percentile.__doc__


def product(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("product", col)


product.__doc__ = pysparkfuncs.product.__doc__


def skewness(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("skewness", col)


skewness.__doc__ = pysparkfuncs.skewness.__doc__


def stddev(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("stddev", col)


stddev.__doc__ = pysparkfuncs.stddev.__doc__


def std(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("std", col)


std.__doc__ = pysparkfuncs.std.__doc__


def stddev_samp(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("stddev_samp", col)


stddev_samp.__doc__ = pysparkfuncs.stddev_samp.__doc__


def stddev_pop(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("stddev_pop", col)


stddev_pop.__doc__ = pysparkfuncs.stddev_pop.__doc__


def sum(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sum", col)


sum.__doc__ = pysparkfuncs.sum.__doc__


def sumDistinct(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 3.4, use sum_distinct instead.", FutureWarning)
    return sum_distinct(col)


sumDistinct.__doc__ = pysparkfuncs.sumDistinct.__doc__


def sum_distinct(col: "ColumnOrName") -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    return ConnectColumn(
        UnresolvedFunction("sum", [_to_col(col)._expr], is_distinct=True)  # type: ignore[list-item]
    )


sum_distinct.__doc__ = pysparkfuncs.sum_distinct.__doc__


def var_pop(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("var_pop", col)


var_pop.__doc__ = pysparkfuncs.var_pop.__doc__


def regr_avgx(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_avgx", y, x)


regr_avgx.__doc__ = pysparkfuncs.regr_avgx.__doc__


def regr_avgy(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_avgy", y, x)


regr_avgy.__doc__ = pysparkfuncs.regr_avgy.__doc__


def regr_count(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_count", y, x)


regr_count.__doc__ = pysparkfuncs.regr_count.__doc__


def regr_intercept(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_intercept", y, x)


regr_intercept.__doc__ = pysparkfuncs.regr_intercept.__doc__


def regr_r2(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_r2", y, x)


regr_r2.__doc__ = pysparkfuncs.regr_r2.__doc__


def regr_slope(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_slope", y, x)


regr_slope.__doc__ = pysparkfuncs.regr_slope.__doc__


def regr_sxx(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_sxx", y, x)


regr_sxx.__doc__ = pysparkfuncs.regr_sxx.__doc__


def regr_sxy(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_sxy", y, x)


regr_sxy.__doc__ = pysparkfuncs.regr_sxy.__doc__


def regr_syy(y: "ColumnOrName", x: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regr_syy", y, x)


regr_syy.__doc__ = pysparkfuncs.regr_syy.__doc__


def var_samp(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("var_samp", col)


var_samp.__doc__ = pysparkfuncs.var_samp.__doc__


def variance(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("variance", col)


variance.__doc__ = pysparkfuncs.variance.__doc__


def every(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("every", col)


every.__doc__ = pysparkfuncs.every.__doc__


def bool_and(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bool_and", col)


bool_and.__doc__ = pysparkfuncs.bool_and.__doc__


def some(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("some", col)


some.__doc__ = pysparkfuncs.some.__doc__


def bool_or(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bool_or", col)


bool_or.__doc__ = pysparkfuncs.bool_or.__doc__


def bit_and(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bit_and", col)


bit_and.__doc__ = pysparkfuncs.bit_and.__doc__


def bit_or(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bit_or", col)


bit_or.__doc__ = pysparkfuncs.bit_or.__doc__


def bit_xor(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bit_xor", col)


bit_xor.__doc__ = pysparkfuncs.bit_xor.__doc__


# Window Functions


def cume_dist() -> Column:
    return _invoke_function("cume_dist")


cume_dist.__doc__ = pysparkfuncs.cume_dist.__doc__


def dense_rank() -> Column:
    return _invoke_function("dense_rank")


dense_rank.__doc__ = pysparkfuncs.dense_rank.__doc__


def lag(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) -> Column:
    if default is None:
        return _invoke_function("lag", _to_col(col), lit(offset))
    else:
        return _invoke_function("lag", _to_col(col), lit(offset), lit(default))


lag.__doc__ = pysparkfuncs.lag.__doc__


def lead(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) -> Column:
    if default is None:
        return _invoke_function("lead", _to_col(col), lit(offset))
    else:
        return _invoke_function("lead", _to_col(col), lit(offset), lit(default))


lead.__doc__ = pysparkfuncs.lead.__doc__


def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = False) -> Column:
    if ignoreNulls is None:
        return _invoke_function("nth_value", _to_col(col), lit(offset))
    else:
        return _invoke_function("nth_value", _to_col(col), lit(offset), lit(ignoreNulls))


nth_value.__doc__ = pysparkfuncs.nth_value.__doc__


def any_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
    if ignoreNulls is None:
        return _invoke_function_over_columns("any_value", col)
    else:
        ignoreNulls = _enum_to_value(ignoreNulls)
        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
        return _invoke_function_over_columns(
            "any_value", col, ignoreNulls  # type: ignore[arg-type]
        )


any_value.__doc__ = pysparkfuncs.any_value.__doc__


def first_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
    if ignoreNulls is None:
        return _invoke_function_over_columns("first_value", col)
    else:
        ignoreNulls = _enum_to_value(ignoreNulls)
        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
        return _invoke_function_over_columns(
            "first_value", col, ignoreNulls  # type: ignore[arg-type]
        )


first_value.__doc__ = pysparkfuncs.first_value.__doc__


def last_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
    if ignoreNulls is None:
        return _invoke_function_over_columns("last_value", col)
    else:
        ignoreNulls = _enum_to_value(ignoreNulls)
        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
        return _invoke_function_over_columns(
            "last_value", col, ignoreNulls  # type: ignore[arg-type]
        )


last_value.__doc__ = pysparkfuncs.last_value.__doc__


def count_if(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("count_if", col)


count_if.__doc__ = pysparkfuncs.count_if.__doc__


def histogram_numeric(col: "ColumnOrName", nBins: Column) -> Column:
    return _invoke_function_over_columns("histogram_numeric", col, nBins)


histogram_numeric.__doc__ = pysparkfuncs.histogram_numeric.__doc__


def ntile(n: int) -> Column:
    return _invoke_function("ntile", lit(n))


ntile.__doc__ = pysparkfuncs.ntile.__doc__


def percent_rank() -> Column:
    return _invoke_function("percent_rank")


percent_rank.__doc__ = pysparkfuncs.percent_rank.__doc__


def rank() -> Column:
    return _invoke_function("rank")


rank.__doc__ = pysparkfuncs.rank.__doc__


def row_number() -> Column:
    return _invoke_function("row_number")


row_number.__doc__ = pysparkfuncs.row_number.__doc__


def aggregate(
    col: "ColumnOrName",
    initialValue: "ColumnOrName",
    merge: Callable[[Column, Column], Column],
    finish: Optional[Callable[[Column], Column]] = None,
) -> Column:
    if finish is not None:
        return _invoke_higher_order_function("aggregate", [col, initialValue], [merge, finish])

    else:
        return _invoke_higher_order_function("aggregate", [col, initialValue], [merge])


aggregate.__doc__ = pysparkfuncs.aggregate.__doc__


def reduce(
    col: "ColumnOrName",
    initialValue: "ColumnOrName",
    merge: Callable[[Column, Column], Column],
    finish: Optional[Callable[[Column], Column]] = None,
) -> Column:
    if finish is not None:
        return _invoke_higher_order_function("reduce", [col, initialValue], [merge, finish])

    else:
        return _invoke_higher_order_function("reduce", [col, initialValue], [merge])


reduce.__doc__ = pysparkfuncs.reduce.__doc__


def array(
    *cols: Union["ColumnOrName", Sequence["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
    if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
        cols = cols[0]  # type: ignore[assignment]
    return _invoke_function_over_columns("array", *cols)  # type: ignore[arg-type]


array.__doc__ = pysparkfuncs.array.__doc__


def array_append(col: "ColumnOrName", value: Any) -> Column:
    return _invoke_function("array_append", _to_col(col), lit(value))


array_append.__doc__ = pysparkfuncs.array_append.__doc__


def array_contains(col: "ColumnOrName", value: Any) -> Column:
    return _invoke_function("array_contains", _to_col(col), lit(value))


array_contains.__doc__ = pysparkfuncs.array_contains.__doc__


def array_distinct(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_distinct", col)


array_distinct.__doc__ = pysparkfuncs.array_distinct.__doc__


def array_except(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_except", col1, col2)


array_except.__doc__ = pysparkfuncs.array_except.__doc__


def array_insert(arr: "ColumnOrName", pos: Union["ColumnOrName", int], value: Any) -> Column:
    pos = _enum_to_value(pos)
    _pos = lit(pos) if isinstance(pos, int) else _to_col(pos)
    return _invoke_function("array_insert", _to_col(arr), _pos, lit(value))


array_insert.__doc__ = pysparkfuncs.array_insert.__doc__


def array_intersect(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_intersect", col1, col2)


array_intersect.__doc__ = pysparkfuncs.array_intersect.__doc__


def array_compact(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_compact", col)


array_compact.__doc__ = pysparkfuncs.array_compact.__doc__


def array_join(
    col: "ColumnOrName", delimiter: str, null_replacement: Optional[str] = None
) -> Column:
    if null_replacement is None:
        return _invoke_function("array_join", _to_col(col), lit(delimiter))
    else:
        return _invoke_function("array_join", _to_col(col), lit(delimiter), lit(null_replacement))


array_join.__doc__ = pysparkfuncs.array_join.__doc__


def array_max(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_max", col)


array_max.__doc__ = pysparkfuncs.array_max.__doc__


def array_min(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_min", col)


array_min.__doc__ = pysparkfuncs.array_min.__doc__


def array_size(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_size", col)


array_size.__doc__ = pysparkfuncs.array_size.__doc__


def cardinality(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("cardinality", col)


cardinality.__doc__ = pysparkfuncs.cardinality.__doc__


def array_position(col: "ColumnOrName", value: Any) -> Column:
    return _invoke_function("array_position", _to_col(col), lit(value))


array_position.__doc__ = pysparkfuncs.array_position.__doc__


def array_prepend(col: "ColumnOrName", value: Any) -> Column:
    return _invoke_function("array_prepend", _to_col(col), lit(value))


array_prepend.__doc__ = pysparkfuncs.array_prepend.__doc__


def array_remove(col: "ColumnOrName", element: Any) -> Column:
    return _invoke_function("array_remove", _to_col(col), lit(element))


array_remove.__doc__ = pysparkfuncs.array_remove.__doc__


def array_repeat(col: "ColumnOrName", count: Union["ColumnOrName", int]) -> Column:
    count = _enum_to_value(count)
    _count = lit(count) if isinstance(count, int) else _to_col(count)
    return _invoke_function("array_repeat", _to_col(col), _count)


array_repeat.__doc__ = pysparkfuncs.array_repeat.__doc__


def array_sort(
    col: "ColumnOrName", comparator: Optional[Callable[[Column, Column], Column]] = None
) -> Column:
    if comparator is None:
        return _invoke_function_over_columns("array_sort", col)
    else:
        return _invoke_higher_order_function("array_sort", [col], [comparator])


array_sort.__doc__ = pysparkfuncs.array_sort.__doc__


def array_union(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("array_union", col1, col2)


array_union.__doc__ = pysparkfuncs.array_union.__doc__


def arrays_overlap(a1: "ColumnOrName", a2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("arrays_overlap", a1, a2)


arrays_overlap.__doc__ = pysparkfuncs.arrays_overlap.__doc__


def arrays_zip(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("arrays_zip", *cols)


arrays_zip.__doc__ = pysparkfuncs.arrays_zip.__doc__


def concat(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("concat", *cols)


concat.__doc__ = pysparkfuncs.concat.__doc__


def create_map(
    *cols: Union["ColumnOrName", Sequence["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
    if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
        cols = cols[0]  # type: ignore[assignment]
    return _invoke_function_over_columns("map", *cols)  # type: ignore[arg-type]


create_map.__doc__ = pysparkfuncs.create_map.__doc__


def element_at(col: "ColumnOrName", extraction: Any) -> Column:
    return _invoke_function("element_at", _to_col(col), lit(extraction))


element_at.__doc__ = pysparkfuncs.element_at.__doc__


def try_element_at(col: "ColumnOrName", extraction: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_element_at", col, extraction)


try_element_at.__doc__ = pysparkfuncs.try_element_at.__doc__


def exists(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column:
    return _invoke_higher_order_function("exists", [col], [f])


exists.__doc__ = pysparkfuncs.exists.__doc__


def explode(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("explode", col)


explode.__doc__ = pysparkfuncs.explode.__doc__


def explode_outer(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("explode_outer", col)


explode_outer.__doc__ = pysparkfuncs.explode_outer.__doc__


def filter(
    col: "ColumnOrName",
    f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]],
) -> Column:
    return _invoke_higher_order_function("filter", [col], [f])


filter.__doc__ = pysparkfuncs.filter.__doc__


def flatten(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("flatten", col)


flatten.__doc__ = pysparkfuncs.flatten.__doc__


def forall(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column:
    return _invoke_higher_order_function("forall", [col], [f])


forall.__doc__ = pysparkfuncs.forall.__doc__


def from_csv(
    col: "ColumnOrName",
    schema: Union[Column, str],
    options: Optional[Mapping[str, str]] = None,
) -> Column:
    if isinstance(schema, Column):
        _schema = schema
    elif isinstance(schema, str):
        _schema = lit(schema)
    else:
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "schema", "arg_type": type(schema).__name__},
        )

    if options is None:
        return _invoke_function("from_csv", _to_col(col), _schema)
    else:
        return _invoke_function("from_csv", _to_col(col), _schema, _options_to_col(options))


from_csv.__doc__ = pysparkfuncs.from_csv.__doc__


def from_json(
    col: "ColumnOrName",
    schema: Union[ArrayType, StructType, Column, str],
    options: Optional[Mapping[str, str]] = None,
) -> Column:
    if isinstance(schema, (str, Column)):
        _schema = lit(schema)
    elif isinstance(schema, DataType):
        _schema = lit(schema.json())
    else:
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_DATATYPE_OR_STR",
            messageParameters={"arg_name": "schema", "arg_type": type(schema).__name__},
        )

    if options is None:
        return _invoke_function("from_json", _to_col(col), _schema)
    else:
        return _invoke_function("from_json", _to_col(col), _schema, _options_to_col(options))


from_json.__doc__ = pysparkfuncs.from_json.__doc__


def from_xml(
    col: "ColumnOrName",
    schema: Union[StructType, Column, str],
    options: Optional[Mapping[str, str]] = None,
) -> Column:
    if isinstance(schema, (str, Column)):
        _schema = lit(schema)
    elif isinstance(schema, StructType):
        _schema = lit(schema.json())
    else:
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR_OR_STRUCT",
            messageParameters={"arg_name": "schema", "arg_type": type(schema).__name__},
        )

    if options is None:
        return _invoke_function("from_xml", _to_col(col), _schema)
    else:
        return _invoke_function("from_xml", _to_col(col), _schema, _options_to_col(options))


from_xml.__doc__ = pysparkfuncs.from_xml.__doc__


def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column:
    index = _enum_to_value(index)
    index = lit(index) if isinstance(index, int) else index

    return _invoke_function_over_columns("get", col, index)


get.__doc__ = pysparkfuncs.get.__doc__


def get_json_object(col: "ColumnOrName", path: str) -> Column:
    return _invoke_function("get_json_object", _to_col(col), lit(path))


get_json_object.__doc__ = pysparkfuncs.get_json_object.__doc__


def json_array_length(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("json_array_length", col)


json_array_length.__doc__ = pysparkfuncs.json_array_length.__doc__


def json_object_keys(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("json_object_keys", col)


json_object_keys.__doc__ = pysparkfuncs.json_object_keys.__doc__


def inline(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("inline", col)


inline.__doc__ = pysparkfuncs.inline.__doc__


def inline_outer(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("inline_outer", col)


inline_outer.__doc__ = pysparkfuncs.inline_outer.__doc__


def json_tuple(col: "ColumnOrName", *fields: str) -> Column:
    if len(fields) == 0:
        raise PySparkValueError(
            errorClass="CANNOT_BE_EMPTY",
            messageParameters={"item": "field"},
        )
    return _invoke_function("json_tuple", _to_col(col), *[lit(field) for field in fields])


json_tuple.__doc__ = pysparkfuncs.json_tuple.__doc__


def map_concat(
    *cols: Union["ColumnOrName", Sequence["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
    if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
        cols = cols[0]  # type: ignore[assignment]
    return _invoke_function_over_columns("map_concat", *cols)  # type: ignore[arg-type]


map_concat.__doc__ = pysparkfuncs.map_concat.__doc__


def map_contains_key(col: "ColumnOrName", value: Any) -> Column:
    return _invoke_function("map_contains_key", _to_col(col), lit(value))


map_contains_key.__doc__ = pysparkfuncs.map_contains_key.__doc__


def map_entries(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("map_entries", col)


map_entries.__doc__ = pysparkfuncs.map_entries.__doc__


def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
    return _invoke_higher_order_function("map_filter", [col], [f])


map_filter.__doc__ = pysparkfuncs.map_filter.__doc__


def map_from_arrays(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("map_from_arrays", col1, col2)


map_from_arrays.__doc__ = pysparkfuncs.map_from_arrays.__doc__


def map_from_entries(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("map_from_entries", col)


map_from_entries.__doc__ = pysparkfuncs.map_from_entries.__doc__


def map_keys(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("map_keys", col)


map_keys.__doc__ = pysparkfuncs.map_keys.__doc__


def map_values(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("map_values", col)


map_values.__doc__ = pysparkfuncs.map_values.__doc__


def map_zip_with(
    col1: "ColumnOrName",
    col2: "ColumnOrName",
    f: Callable[[Column, Column, Column], Column],
) -> Column:
    return _invoke_higher_order_function("map_zip_with", [col1, col2], [f])


map_zip_with.__doc__ = pysparkfuncs.map_zip_with.__doc__


def str_to_map(
    text: "ColumnOrName",
    pairDelim: Optional["ColumnOrName"] = None,
    keyValueDelim: Optional["ColumnOrName"] = None,
) -> Column:
    _pairDelim = lit(",") if pairDelim is None else _to_col(pairDelim)
    _keyValueDelim = lit(":") if keyValueDelim is None else _to_col(keyValueDelim)

    return _invoke_function("str_to_map", _to_col(text), _pairDelim, _keyValueDelim)


str_to_map.__doc__ = pysparkfuncs.str_to_map.__doc__


def try_parse_json(col: "ColumnOrName") -> Column:
    return _invoke_function("try_parse_json", _to_col(col))


try_parse_json.__doc__ = pysparkfuncs.try_parse_json.__doc__


def to_variant_object(col: "ColumnOrName") -> Column:
    return _invoke_function("to_variant_object", _to_col(col))


to_variant_object.__doc__ = pysparkfuncs.to_variant_object.__doc__


def parse_json(col: "ColumnOrName") -> Column:
    return _invoke_function("parse_json", _to_col(col))


parse_json.__doc__ = pysparkfuncs.parse_json.__doc__


def is_variant_null(v: "ColumnOrName") -> Column:
    return _invoke_function("is_variant_null", _to_col(v))


is_variant_null.__doc__ = pysparkfuncs.is_variant_null.__doc__


def variant_get(v: "ColumnOrName", path: Union[Column, str], targetType: str) -> Column:
    assert isinstance(path, (Column, str))
    if isinstance(path, str):
        return _invoke_function("variant_get", _to_col(v), lit(path), lit(targetType))
    else:
        return _invoke_function("variant_get", _to_col(v), path, lit(targetType))


variant_get.__doc__ = pysparkfuncs.variant_get.__doc__


def try_variant_get(v: "ColumnOrName", path: Union[Column, str], targetType: str) -> Column:
    assert isinstance(path, (Column, str))
    if isinstance(path, str):
        return _invoke_function("try_variant_get", _to_col(v), lit(path), lit(targetType))
    else:
        return _invoke_function("try_variant_get", _to_col(v), path, lit(targetType))


try_variant_get.__doc__ = pysparkfuncs.try_variant_get.__doc__


def schema_of_variant(v: "ColumnOrName") -> Column:
    return _invoke_function("schema_of_variant", _to_col(v))


schema_of_variant.__doc__ = pysparkfuncs.schema_of_variant.__doc__


def schema_of_variant_agg(v: "ColumnOrName") -> Column:
    return _invoke_function("schema_of_variant_agg", _to_col(v))


schema_of_variant_agg.__doc__ = pysparkfuncs.schema_of_variant_agg.__doc__


def posexplode(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("posexplode", col)


posexplode.__doc__ = pysparkfuncs.posexplode.__doc__


def posexplode_outer(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("posexplode_outer", col)


posexplode_outer.__doc__ = pysparkfuncs.posexplode_outer.__doc__


def reverse(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("reverse", col)


reverse.__doc__ = pysparkfuncs.reverse.__doc__


def sequence(
    start: "ColumnOrName", stop: "ColumnOrName", step: Optional["ColumnOrName"] = None
) -> Column:
    if step is None:
        return _invoke_function_over_columns("sequence", start, stop)
    else:
        return _invoke_function_over_columns("sequence", start, stop, step)


sequence.__doc__ = pysparkfuncs.sequence.__doc__


def schema_of_csv(csv: Union[str, Column], options: Optional[Mapping[str, str]] = None) -> Column:
    csv = _enum_to_value(csv)
    if not isinstance(csv, (str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "csv", "arg_type": type(csv).__name__},
        )

    if options is None:
        return _invoke_function("schema_of_csv", lit(csv))
    else:
        return _invoke_function("schema_of_csv", lit(csv), _options_to_col(options))


schema_of_csv.__doc__ = pysparkfuncs.schema_of_csv.__doc__


def schema_of_json(json: Union[str, Column], options: Optional[Mapping[str, str]] = None) -> Column:
    json = _enum_to_value(json)
    if not isinstance(json, (str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "json", "arg_type": type(json).__name__},
        )

    if options is None:
        return _invoke_function("schema_of_json", lit(json))
    else:
        return _invoke_function("schema_of_json", lit(json), _options_to_col(options))


schema_of_json.__doc__ = pysparkfuncs.schema_of_json.__doc__


def schema_of_xml(xml: Union[str, Column], options: Optional[Mapping[str, str]] = None) -> Column:
    xml = _enum_to_value(xml)
    if not isinstance(xml, (str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "xml", "arg_type": type(xml).__name__},
        )

    if options is None:
        return _invoke_function("schema_of_xml", lit(xml))
    else:
        return _invoke_function("schema_of_xml", lit(xml), _options_to_col(options))


schema_of_xml.__doc__ = pysparkfuncs.schema_of_xml.__doc__


def shuffle(col: "ColumnOrName", seed: Optional[Union[Column, int]] = None) -> Column:
    _seed = lit(py_random.randint(0, sys.maxsize)) if seed is None else lit(seed)
    return _invoke_function("shuffle", _to_col(col), _seed)


shuffle.__doc__ = pysparkfuncs.shuffle.__doc__


def size(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("size", col)


size.__doc__ = pysparkfuncs.size.__doc__


def slice(
    x: "ColumnOrName", start: Union["ColumnOrName", int], length: Union["ColumnOrName", int]
) -> Column:
    start = _enum_to_value(start)
    if isinstance(start, (Column, str)):
        _start = start
    elif isinstance(start, int):
        _start = lit(start)
    else:
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_INT_OR_STR",
            messageParameters={"arg_name": "start", "arg_type": type(start).__name__},
        )

    length = _enum_to_value(length)
    if isinstance(length, (Column, str)):
        _length = length
    elif isinstance(length, int):
        _length = lit(length)
    else:
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_INT_OR_STR",
            messageParameters={"arg_name": "length", "arg_type": type(length).__name__},
        )

    return _invoke_function_over_columns("slice", x, _start, _length)


slice.__doc__ = pysparkfuncs.slice.__doc__


def sort_array(col: "ColumnOrName", asc: bool = True) -> Column:
    return _invoke_function("sort_array", _to_col(col), lit(asc))


sort_array.__doc__ = pysparkfuncs.sort_array.__doc__


def struct(
    *cols: Union["ColumnOrName", Sequence["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
    if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
        cols = cols[0]  # type: ignore[assignment]
    return _invoke_function_over_columns("struct", *cols)  # type: ignore[arg-type]


struct.__doc__ = pysparkfuncs.struct.__doc__


def named_struct(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("named_struct", *cols)


named_struct.__doc__ = pysparkfuncs.named_struct.__doc__


def to_csv(col: "ColumnOrName", options: Optional[Mapping[str, str]] = None) -> Column:
    if options is None:
        return _invoke_function("to_csv", _to_col(col))
    else:
        return _invoke_function("to_csv", _to_col(col), _options_to_col(options))


to_csv.__doc__ = pysparkfuncs.to_csv.__doc__


def to_json(col: "ColumnOrName", options: Optional[Mapping[str, str]] = None) -> Column:
    if options is None:
        return _invoke_function("to_json", _to_col(col))
    else:
        return _invoke_function("to_json", _to_col(col), _options_to_col(options))


to_json.__doc__ = pysparkfuncs.to_json.__doc__


def to_xml(col: "ColumnOrName", options: Optional[Mapping[str, str]] = None) -> Column:
    if options is None:
        return _invoke_function("to_xml", _to_col(col))
    else:
        return _invoke_function("to_xml", _to_col(col), _options_to_col(options))


to_xml.__doc__ = pysparkfuncs.to_xml.__doc__


def transform(
    col: "ColumnOrName",
    f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]],
) -> Column:
    return _invoke_higher_order_function("transform", [col], [f])


transform.__doc__ = pysparkfuncs.transform.__doc__


def transform_keys(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
    return _invoke_higher_order_function("transform_keys", [col], [f])


transform_keys.__doc__ = pysparkfuncs.transform_keys.__doc__


def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
    return _invoke_higher_order_function("transform_values", [col], [f])


transform_values.__doc__ = pysparkfuncs.transform_values.__doc__


def zip_with(
    left: "ColumnOrName",
    right: "ColumnOrName",
    f: Callable[[Column, Column], Column],
) -> Column:
    return _invoke_higher_order_function("zip_with", [left, right], [f])


zip_with.__doc__ = pysparkfuncs.zip_with.__doc__


# String/Binary functions


def upper(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("upper", col)


upper.__doc__ = pysparkfuncs.upper.__doc__


def lower(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("lower", col)


lower.__doc__ = pysparkfuncs.lower.__doc__


def ascii(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("ascii", col)


ascii.__doc__ = pysparkfuncs.ascii.__doc__


def base64(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("base64", col)


base64.__doc__ = pysparkfuncs.base64.__doc__


def unbase64(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("unbase64", col)


unbase64.__doc__ = pysparkfuncs.unbase64.__doc__


def ltrim(col: "ColumnOrName", trim: Optional["ColumnOrName"] = None) -> Column:
    if trim is not None:
        return _invoke_function_over_columns("ltrim", trim, col)
    else:
        return _invoke_function_over_columns("ltrim", col)


ltrim.__doc__ = pysparkfuncs.ltrim.__doc__


def rtrim(col: "ColumnOrName", trim: Optional["ColumnOrName"] = None) -> Column:
    if trim is not None:
        return _invoke_function_over_columns("rtrim", trim, col)
    else:
        return _invoke_function_over_columns("rtrim", col)


rtrim.__doc__ = pysparkfuncs.rtrim.__doc__


def trim(col: "ColumnOrName", trim: Optional["ColumnOrName"] = None) -> Column:
    if trim is not None:
        return _invoke_function_over_columns("trim", trim, col)
    else:
        return _invoke_function_over_columns("trim", col)


trim.__doc__ = pysparkfuncs.trim.__doc__


def concat_ws(sep: str, *cols: "ColumnOrName") -> Column:
    return _invoke_function("concat_ws", lit(sep), *[_to_col(c) for c in cols])


concat_ws.__doc__ = pysparkfuncs.concat_ws.__doc__


def decode(col: "ColumnOrName", charset: str) -> Column:
    return _invoke_function("decode", _to_col(col), lit(charset))


decode.__doc__ = pysparkfuncs.decode.__doc__


def encode(col: "ColumnOrName", charset: str) -> Column:
    return _invoke_function("encode", _to_col(col), lit(charset))


encode.__doc__ = pysparkfuncs.encode.__doc__


def is_valid_utf8(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("is_valid_utf8", _to_col(str))


is_valid_utf8.__doc__ = pysparkfuncs.is_valid_utf8.__doc__


def make_valid_utf8(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("make_valid_utf8", _to_col(str))


make_valid_utf8.__doc__ = pysparkfuncs.make_valid_utf8.__doc__


def validate_utf8(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("validate_utf8", _to_col(str))


validate_utf8.__doc__ = pysparkfuncs.validate_utf8.__doc__


def try_validate_utf8(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_validate_utf8", _to_col(str))


try_validate_utf8.__doc__ = pysparkfuncs.try_validate_utf8.__doc__


def format_number(col: "ColumnOrName", d: int) -> Column:
    return _invoke_function("format_number", _to_col(col), lit(d))


format_number.__doc__ = pysparkfuncs.format_number.__doc__


def format_string(format: str, *cols: "ColumnOrName") -> Column:
    return _invoke_function("format_string", lit(format), *[_to_col(c) for c in cols])


format_string.__doc__ = pysparkfuncs.format_string.__doc__


def instr(str: "ColumnOrName", substr: Union[Column, str]) -> Column:
    return _invoke_function("instr", _to_col(str), lit(substr))


instr.__doc__ = pysparkfuncs.instr.__doc__


def overlay(
    src: "ColumnOrName",
    replace: "ColumnOrName",
    pos: Union["ColumnOrName", int],
    len: Union["ColumnOrName", int] = -1,
) -> Column:
    pos = _enum_to_value(pos)
    if not isinstance(pos, (int, str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_INT_OR_STR",
            messageParameters={"arg_name": "pos", "arg_type": type(pos).__name__},
        )
    len = _enum_to_value(len)
    if len is not None and not isinstance(len, (int, str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_INT_OR_STR",
            messageParameters={"arg_name": "len", "arg_type": type(len).__name__},
        )

    if isinstance(pos, int):
        pos = lit(pos)
    if isinstance(len, int):
        len = lit(len)

    return _invoke_function_over_columns("overlay", src, replace, pos, len)


overlay.__doc__ = pysparkfuncs.overlay.__doc__


def sentences(
    string: "ColumnOrName",
    language: Optional["ColumnOrName"] = None,
    country: Optional["ColumnOrName"] = None,
) -> Column:
    _language = lit("") if language is None else _to_col(language)
    _country = lit("") if country is None else _to_col(country)

    return _invoke_function("sentences", _to_col(string), _language, _country)


sentences.__doc__ = pysparkfuncs.sentences.__doc__


def substring(
    str: "ColumnOrName",
    pos: Union["ColumnOrName", int],
    len: Union["ColumnOrName", int],
) -> Column:
    _pos = lit(pos) if isinstance(pos, int) else _to_col(pos)
    _len = lit(len) if isinstance(len, int) else _to_col(len)
    return _invoke_function("substring", _to_col(str), _pos, _len)


substring.__doc__ = pysparkfuncs.substring.__doc__


def substring_index(str: "ColumnOrName", delim: str, count: int) -> Column:
    return _invoke_function("substring_index", _to_col(str), lit(delim), lit(count))


substring_index.__doc__ = pysparkfuncs.substring_index.__doc__


def levenshtein(
    left: "ColumnOrName", right: "ColumnOrName", threshold: Optional[int] = None
) -> Column:
    if threshold is None:
        return _invoke_function_over_columns("levenshtein", left, right)
    else:
        return _invoke_function("levenshtein", _to_col(left), _to_col(right), lit(threshold))


levenshtein.__doc__ = pysparkfuncs.levenshtein.__doc__


def locate(substr: str, str: "ColumnOrName", pos: int = 1) -> Column:
    return _invoke_function("locate", lit(substr), _to_col(str), lit(pos))


locate.__doc__ = pysparkfuncs.locate.__doc__


def lpad(
    col: "ColumnOrName",
    len: Union[Column, int],
    pad: Union[Column, str],
) -> Column:
    return _invoke_function_over_columns("lpad", col, lit(len), lit(pad))


lpad.__doc__ = pysparkfuncs.lpad.__doc__


def rpad(
    col: "ColumnOrName",
    len: Union[Column, int],
    pad: Union[Column, str],
) -> Column:
    return _invoke_function_over_columns("rpad", col, lit(len), lit(pad))


rpad.__doc__ = pysparkfuncs.rpad.__doc__


def repeat(col: "ColumnOrName", n: Union["ColumnOrName", int]) -> Column:
    n = _enum_to_value(n)
    n = lit(n) if isinstance(n, int) else n
    return _invoke_function("repeat", _to_col(col), _to_col(n))


repeat.__doc__ = pysparkfuncs.repeat.__doc__


def split(
    str: "ColumnOrName",
    pattern: Union[Column, str],
    limit: Union["ColumnOrName", int] = -1,
) -> Column:
    limit = _enum_to_value(limit)
    limit = lit(limit) if isinstance(limit, int) else _to_col(limit)
    return _invoke_function("split", _to_col(str), lit(pattern), limit)


split.__doc__ = pysparkfuncs.split.__doc__


def rlike(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("rlike", str, regexp)


rlike.__doc__ = pysparkfuncs.rlike.__doc__


def regexp(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regexp", str, regexp)


regexp.__doc__ = pysparkfuncs.regexp.__doc__


def regexp_like(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regexp_like", str, regexp)


regexp_like.__doc__ = pysparkfuncs.regexp_like.__doc__


def randstr(length: Union[Column, int], seed: Optional[Union[Column, int]] = None) -> Column:
    if seed is None:
        return _invoke_function_over_columns(
            "randstr", lit(length), lit(py_random.randint(0, sys.maxsize))
        )
    else:
        return _invoke_function_over_columns("randstr", lit(length), lit(seed))


randstr.__doc__ = pysparkfuncs.randstr.__doc__


def regexp_count(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regexp_count", str, regexp)


regexp_count.__doc__ = pysparkfuncs.regexp_count.__doc__


def regexp_extract(str: "ColumnOrName", pattern: str, idx: int) -> Column:
    return _invoke_function("regexp_extract", _to_col(str), lit(pattern), lit(idx))


regexp_extract.__doc__ = pysparkfuncs.regexp_extract.__doc__


def regexp_extract_all(
    str: "ColumnOrName", regexp: "ColumnOrName", idx: Optional[Union[int, Column]] = None
) -> Column:
    if idx is None:
        return _invoke_function_over_columns("regexp_extract_all", str, regexp)
    else:
        return _invoke_function_over_columns("regexp_extract_all", str, regexp, lit(idx))


regexp_extract_all.__doc__ = pysparkfuncs.regexp_extract_all.__doc__


def regexp_replace(
    string: "ColumnOrName", pattern: Union[str, Column], replacement: Union[str, Column]
) -> Column:
    return _invoke_function_over_columns("regexp_replace", string, lit(pattern), lit(replacement))


regexp_replace.__doc__ = pysparkfuncs.regexp_replace.__doc__


def regexp_substr(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("regexp_substr", str, regexp)


regexp_substr.__doc__ = pysparkfuncs.regexp_substr.__doc__


def regexp_instr(
    str: "ColumnOrName", regexp: "ColumnOrName", idx: Optional[Union[int, Column]] = None
) -> Column:
    if idx is None:
        return _invoke_function_over_columns("regexp_instr", str, regexp)
    else:
        return _invoke_function_over_columns("regexp_instr", str, regexp, lit(idx))


regexp_instr.__doc__ = pysparkfuncs.regexp_instr.__doc__


def initcap(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("initcap", col)


initcap.__doc__ = pysparkfuncs.initcap.__doc__


def soundex(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("soundex", col)


soundex.__doc__ = pysparkfuncs.soundex.__doc__


def length(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("length", col)


length.__doc__ = pysparkfuncs.length.__doc__


def octet_length(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("octet_length", col)


octet_length.__doc__ = pysparkfuncs.octet_length.__doc__


def bit_length(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bit_length", col)


bit_length.__doc__ = pysparkfuncs.bit_length.__doc__


def translate(srcCol: "ColumnOrName", matching: str, replace: str) -> Column:
    return _invoke_function("translate", _to_col(srcCol), lit(matching), lit(replace))


translate.__doc__ = pysparkfuncs.translate.__doc__


def to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column:
    if format is not None:
        return _invoke_function_over_columns("to_binary", col, format)
    else:
        return _invoke_function_over_columns("to_binary", col)


to_binary.__doc__ = pysparkfuncs.to_binary.__doc__


def to_char(col: "ColumnOrName", format: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("to_char", col, format)


to_char.__doc__ = pysparkfuncs.to_char.__doc__


def to_varchar(col: "ColumnOrName", format: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("to_varchar", col, format)


to_varchar.__doc__ = pysparkfuncs.to_varchar.__doc__


def to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("to_number", col, format)


to_number.__doc__ = pysparkfuncs.to_number.__doc__


def replace(
    src: "ColumnOrName", search: "ColumnOrName", replace: Optional["ColumnOrName"] = None
) -> Column:
    if replace is not None:
        return _invoke_function_over_columns("replace", src, search, replace)
    else:
        return _invoke_function_over_columns("replace", src, search)


replace.__doc__ = pysparkfuncs.replace.__doc__


def split_part(src: "ColumnOrName", delimiter: "ColumnOrName", partNum: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("split_part", src, delimiter, partNum)


split_part.__doc__ = pysparkfuncs.split_part.__doc__


def substr(
    str: "ColumnOrName", pos: "ColumnOrName", len: Optional["ColumnOrName"] = None
) -> Column:
    if len is not None:
        return _invoke_function_over_columns("substr", str, pos, len)
    else:
        return _invoke_function_over_columns("substr", str, pos)


substr.__doc__ = pysparkfuncs.substr.__doc__


def parse_url(
    url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
    if key is not None:
        return _invoke_function_over_columns("parse_url", url, partToExtract, key)
    else:
        return _invoke_function_over_columns("parse_url", url, partToExtract)


parse_url.__doc__ = pysparkfuncs.parse_url.__doc__


def try_parse_url(
    url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
    if key is not None:
        return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
    else:
        return _invoke_function_over_columns("try_parse_url", url, partToExtract)


try_parse_url.__doc__ = pysparkfuncs.try_parse_url.__doc__


def printf(format: "ColumnOrName", *cols: "ColumnOrName") -> Column:
    return _invoke_function("printf", _to_col(format), *[_to_col(c) for c in cols])


printf.__doc__ = pysparkfuncs.printf.__doc__


def url_decode(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("url_decode", str)


url_decode.__doc__ = pysparkfuncs.url_decode.__doc__


def try_url_decode(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_url_decode", str)


try_url_decode.__doc__ = pysparkfuncs.try_url_decode.__doc__


def url_encode(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("url_encode", str)


url_encode.__doc__ = pysparkfuncs.url_encode.__doc__


def position(
    substr: "ColumnOrName", str: "ColumnOrName", start: Optional["ColumnOrName"] = None
) -> Column:
    if start is not None:
        return _invoke_function_over_columns("position", substr, str, start)
    else:
        return _invoke_function_over_columns("position", substr, str)


position.__doc__ = pysparkfuncs.position.__doc__


def endswith(str: "ColumnOrName", suffix: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("endswith", str, suffix)


endswith.__doc__ = pysparkfuncs.endswith.__doc__


def startswith(str: "ColumnOrName", prefix: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("startswith", str, prefix)


startswith.__doc__ = pysparkfuncs.startswith.__doc__


def char(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("char", col)


char.__doc__ = pysparkfuncs.char.__doc__


def try_to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column:
    if format is not None:
        return _invoke_function_over_columns("try_to_binary", col, format)
    else:
        return _invoke_function_over_columns("try_to_binary", col)


try_to_binary.__doc__ = pysparkfuncs.try_to_binary.__doc__


def try_to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_to_number", col, format)


try_to_number.__doc__ = pysparkfuncs.try_to_number.__doc__


def btrim(str: "ColumnOrName", trim: Optional["ColumnOrName"] = None) -> Column:
    if trim is not None:
        return _invoke_function_over_columns("btrim", str, trim)
    else:
        return _invoke_function_over_columns("btrim", str)


btrim.__doc__ = pysparkfuncs.btrim.__doc__


def char_length(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("char_length", str)


char_length.__doc__ = pysparkfuncs.char_length.__doc__


def character_length(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("character_length", str)


character_length.__doc__ = pysparkfuncs.character_length.__doc__


def chr(n: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("chr", n)


chr.__doc__ = pysparkfuncs.chr.__doc__


def contains(left: "ColumnOrName", right: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("contains", left, right)


contains.__doc__ = pysparkfuncs.contains.__doc__


def elt(*inputs: "ColumnOrName") -> Column:
    return _invoke_function("elt", *[_to_col(input) for input in inputs])


elt.__doc__ = pysparkfuncs.elt.__doc__


def find_in_set(str: "ColumnOrName", str_array: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("find_in_set", str, str_array)


find_in_set.__doc__ = pysparkfuncs.find_in_set.__doc__


def like(
    str: "ColumnOrName", pattern: "ColumnOrName", escapeChar: Optional["Column"] = None
) -> Column:
    if escapeChar is not None:
        return _invoke_function_over_columns("like", str, pattern, escapeChar)
    else:
        return _invoke_function_over_columns("like", str, pattern)


like.__doc__ = pysparkfuncs.like.__doc__


def ilike(
    str: "ColumnOrName", pattern: "ColumnOrName", escapeChar: Optional["Column"] = None
) -> Column:
    if escapeChar is not None:
        return _invoke_function_over_columns("ilike", str, pattern, escapeChar)
    else:
        return _invoke_function_over_columns("ilike", str, pattern)


ilike.__doc__ = pysparkfuncs.ilike.__doc__


def lcase(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("lcase", str)


lcase.__doc__ = pysparkfuncs.lcase.__doc__


def ucase(str: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("ucase", str)


ucase.__doc__ = pysparkfuncs.ucase.__doc__


def left(str: "ColumnOrName", len: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("left", str, len)


left.__doc__ = pysparkfuncs.left.__doc__


def right(str: "ColumnOrName", len: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("right", str, len)


right.__doc__ = pysparkfuncs.right.__doc__


def mask(
    col: "ColumnOrName",
    upperChar: Optional["ColumnOrName"] = None,
    lowerChar: Optional["ColumnOrName"] = None,
    digitChar: Optional["ColumnOrName"] = None,
    otherChar: Optional["ColumnOrName"] = None,
) -> Column:
    _upperChar = lit("X") if upperChar is None else upperChar
    _lowerChar = lit("x") if lowerChar is None else lowerChar
    _digitChar = lit("n") if digitChar is None else digitChar
    _otherChar = lit(None) if otherChar is None else otherChar

    return _invoke_function_over_columns(
        "mask", col, _upperChar, _lowerChar, _digitChar, _otherChar
    )


mask.__doc__ = pysparkfuncs.mask.__doc__


def collate(col: "ColumnOrName", collation: str) -> Column:
    return _invoke_function("collate", _to_col(col), lit(collation))


collate.__doc__ = pysparkfuncs.collate.__doc__


def collation(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("collation", col)


collation.__doc__ = pysparkfuncs.collation.__doc__


def quote(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("quote", col)


quote.__doc__ = pysparkfuncs.quote.__doc__


# Date/Timestamp functions


def curdate() -> Column:
    return _invoke_function("curdate")


curdate.__doc__ = pysparkfuncs.curdate.__doc__


def current_date() -> Column:
    return _invoke_function("current_date")


current_date.__doc__ = pysparkfuncs.current_date.__doc__


def current_timestamp() -> Column:
    return _invoke_function("current_timestamp")


current_timestamp.__doc__ = pysparkfuncs.current_timestamp.__doc__


def now() -> Column:
    return _invoke_function("now")


now.__doc__ = pysparkfuncs.now.__doc__


def current_timezone() -> Column:
    return _invoke_function("current_timezone")


current_timezone.__doc__ = pysparkfuncs.current_timezone.__doc__


def localtimestamp() -> Column:
    return _invoke_function("localtimestamp")


localtimestamp.__doc__ = pysparkfuncs.localtimestamp.__doc__


def date_format(date: "ColumnOrName", format: str) -> Column:
    return _invoke_function("date_format", _to_col(date), lit(format))


date_format.__doc__ = pysparkfuncs.date_format.__doc__


def year(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("year", col)


year.__doc__ = pysparkfuncs.year.__doc__


def quarter(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("quarter", col)


quarter.__doc__ = pysparkfuncs.quarter.__doc__


def month(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("month", col)


month.__doc__ = pysparkfuncs.month.__doc__


def dayofweek(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("dayofweek", col)


dayofweek.__doc__ = pysparkfuncs.dayofweek.__doc__


def dayofmonth(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("dayofmonth", col)


dayofmonth.__doc__ = pysparkfuncs.dayofmonth.__doc__


def day(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("day", col)


day.__doc__ = pysparkfuncs.day.__doc__


def dayofyear(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("dayofyear", col)


dayofyear.__doc__ = pysparkfuncs.dayofyear.__doc__


def hour(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("hour", col)


hour.__doc__ = pysparkfuncs.hour.__doc__


def minute(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("minute", col)


minute.__doc__ = pysparkfuncs.minute.__doc__


def second(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("second", col)


second.__doc__ = pysparkfuncs.second.__doc__


def weekofyear(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("weekofyear", col)


weekofyear.__doc__ = pysparkfuncs.weekofyear.__doc__


def weekday(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("weekday", col)


weekday.__doc__ = pysparkfuncs.weekday.__doc__


def monthname(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("monthname", col)


monthname.__doc__ = pysparkfuncs.monthname.__doc__


def dayname(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("dayname", col)


dayname.__doc__ = pysparkfuncs.dayname.__doc__


def extract(field: Column, source: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("extract", field, source)


extract.__doc__ = pysparkfuncs.extract.__doc__


def date_part(field: Column, source: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("date_part", field, source)


extract.__doc__ = pysparkfuncs.extract.__doc__


def datepart(field: Column, source: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("datepart", field, source)


extract.__doc__ = pysparkfuncs.extract.__doc__


def make_date(year: "ColumnOrName", month: "ColumnOrName", day: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("make_date", year, month, day)


make_date.__doc__ = pysparkfuncs.make_date.__doc__


def date_add(start: "ColumnOrName", days: Union["ColumnOrName", int]) -> Column:
    days = _enum_to_value(days)
    days = lit(days) if isinstance(days, int) else days
    return _invoke_function_over_columns("date_add", start, days)


date_add.__doc__ = pysparkfuncs.date_add.__doc__


def dateadd(start: "ColumnOrName", days: Union["ColumnOrName", int]) -> Column:
    days = _enum_to_value(days)
    days = lit(days) if isinstance(days, int) else days
    return _invoke_function_over_columns("dateadd", start, days)


dateadd.__doc__ = pysparkfuncs.dateadd.__doc__


def date_sub(start: "ColumnOrName", days: Union["ColumnOrName", int]) -> Column:
    days = _enum_to_value(days)
    days = lit(days) if isinstance(days, int) else days
    return _invoke_function_over_columns("date_sub", start, days)


date_sub.__doc__ = pysparkfuncs.date_sub.__doc__


def datediff(end: "ColumnOrName", start: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("datediff", end, start)


datediff.__doc__ = pysparkfuncs.datediff.__doc__


def date_diff(end: "ColumnOrName", start: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("date_diff", end, start)


date_diff.__doc__ = pysparkfuncs.date_diff.__doc__


def date_from_unix_date(days: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("date_from_unix_date", days)


date_from_unix_date.__doc__ = pysparkfuncs.date_from_unix_date.__doc__


def add_months(start: "ColumnOrName", months: Union["ColumnOrName", int]) -> Column:
    months = _enum_to_value(months)
    months = lit(months) if isinstance(months, int) else months
    return _invoke_function_over_columns("add_months", start, months)


add_months.__doc__ = pysparkfuncs.add_months.__doc__


def months_between(date1: "ColumnOrName", date2: "ColumnOrName", roundOff: bool = True) -> Column:
    return _invoke_function("months_between", _to_col(date1), _to_col(date2), lit(roundOff))


months_between.__doc__ = pysparkfuncs.months_between.__doc__


def to_date(col: "ColumnOrName", format: Optional[str] = None) -> Column:
    if format is None:
        return _invoke_function_over_columns("to_date", col)
    else:
        return _invoke_function("to_date", _to_col(col), lit(format))


to_date.__doc__ = pysparkfuncs.to_date.__doc__


def unix_date(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("unix_date", col)


unix_date.__doc__ = pysparkfuncs.unix_date.__doc__


def unix_micros(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("unix_micros", col)


unix_micros.__doc__ = pysparkfuncs.unix_micros.__doc__


def unix_millis(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("unix_millis", col)


unix_millis.__doc__ = pysparkfuncs.unix_millis.__doc__


def unix_seconds(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("unix_seconds", col)


unix_seconds.__doc__ = pysparkfuncs.unix_seconds.__doc__


@overload
def to_timestamp(col: "ColumnOrName") -> Column:
    ...


@overload
def to_timestamp(col: "ColumnOrName", format: str) -> Column:
    ...


def to_timestamp(col: "ColumnOrName", format: Optional[str] = None) -> Column:
    if format is None:
        return _invoke_function_over_columns("to_timestamp", col)
    else:
        return _invoke_function("to_timestamp", _to_col(col), lit(format))


to_timestamp.__doc__ = pysparkfuncs.to_timestamp.__doc__


def try_to_timestamp(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column:
    if format is not None:
        return _invoke_function_over_columns("try_to_timestamp", col, format)
    else:
        return _invoke_function_over_columns("try_to_timestamp", col)


try_to_timestamp.__doc__ = pysparkfuncs.try_to_timestamp.__doc__


def xpath(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath", xml, path)


xpath.__doc__ = pysparkfuncs.xpath.__doc__


def xpath_boolean(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_boolean", xml, path)


xpath_boolean.__doc__ = pysparkfuncs.xpath_boolean.__doc__


def xpath_double(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_double", xml, path)


xpath_double.__doc__ = pysparkfuncs.xpath_double.__doc__


def xpath_number(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_number", xml, path)


xpath_number.__doc__ = pysparkfuncs.xpath_number.__doc__


def xpath_float(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_float", xml, path)


xpath_float.__doc__ = pysparkfuncs.xpath_float.__doc__


def xpath_int(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_int", xml, path)


xpath_int.__doc__ = pysparkfuncs.xpath_int.__doc__


def xpath_long(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_long", xml, path)


xpath_long.__doc__ = pysparkfuncs.xpath_long.__doc__


def xpath_short(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_short", xml, path)


xpath_short.__doc__ = pysparkfuncs.xpath_short.__doc__


def xpath_string(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xpath_string", xml, path)


xpath_string.__doc__ = pysparkfuncs.xpath_string.__doc__


def trunc(date: "ColumnOrName", format: str) -> Column:
    return _invoke_function("trunc", _to_col(date), lit(format))


trunc.__doc__ = pysparkfuncs.trunc.__doc__


def date_trunc(format: str, timestamp: "ColumnOrName") -> Column:
    return _invoke_function("date_trunc", lit(format), _to_col(timestamp))


date_trunc.__doc__ = pysparkfuncs.date_trunc.__doc__


def next_day(date: "ColumnOrName", dayOfWeek: str) -> Column:
    return _invoke_function("next_day", _to_col(date), lit(dayOfWeek))


next_day.__doc__ = pysparkfuncs.next_day.__doc__


def last_day(date: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("last_day", date)


last_day.__doc__ = pysparkfuncs.last_day.__doc__


def from_unixtime(timestamp: "ColumnOrName", format: str = "yyyy-MM-dd HH:mm:ss") -> Column:
    return _invoke_function("from_unixtime", _to_col(timestamp), lit(format))


from_unixtime.__doc__ = pysparkfuncs.from_unixtime.__doc__


@overload
def unix_timestamp(timestamp: "ColumnOrName", format: str = ...) -> Column:
    ...


@overload
def unix_timestamp() -> Column:
    ...


def unix_timestamp(
    timestamp: Optional["ColumnOrName"] = None, format: str = "yyyy-MM-dd HH:mm:ss"
) -> Column:
    if timestamp is None:
        return _invoke_function("unix_timestamp")
    return _invoke_function("unix_timestamp", _to_col(timestamp), lit(format))


unix_timestamp.__doc__ = pysparkfuncs.unix_timestamp.__doc__


def from_utc_timestamp(timestamp: "ColumnOrName", tz: Union[Column, str]) -> Column:
    return _invoke_function_over_columns("from_utc_timestamp", timestamp, lit(tz))


from_utc_timestamp.__doc__ = pysparkfuncs.from_utc_timestamp.__doc__


def to_utc_timestamp(timestamp: "ColumnOrName", tz: Union[Column, str]) -> Column:
    return _invoke_function_over_columns("to_utc_timestamp", timestamp, lit(tz))


to_utc_timestamp.__doc__ = pysparkfuncs.to_utc_timestamp.__doc__


def timestamp_seconds(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("timestamp_seconds", col)


timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__


def timestamp_millis(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("timestamp_millis", col)


timestamp_millis.__doc__ = pysparkfuncs.timestamp_millis.__doc__


def timestamp_micros(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("timestamp_micros", col)


timestamp_micros.__doc__ = pysparkfuncs.timestamp_micros.__doc__


def timestamp_diff(unit: str, start: "ColumnOrName", end: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("timestampdiff", lit(unit), start, end)


timestamp_diff.__doc__ = pysparkfuncs.timestamp_diff.__doc__


def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("timestampadd", lit(unit), quantity, ts)


timestamp_add.__doc__ = pysparkfuncs.timestamp_add.__doc__


def window(
    timeColumn: "ColumnOrName",
    windowDuration: str,
    slideDuration: Optional[str] = None,
    startTime: Optional[str] = None,
) -> Column:
    if windowDuration is None or not isinstance(windowDuration, str):
        raise PySparkTypeError(
            errorClass="NOT_STR",
            messageParameters={
                "arg_name": "windowDuration",
                "arg_type": type(windowDuration).__name__,
            },
        )
    if slideDuration is not None and not isinstance(slideDuration, str):
        raise PySparkTypeError(
            errorClass="NOT_STR",
            messageParameters={
                "arg_name": "slideDuration",
                "arg_type": type(slideDuration).__name__,
            },
        )
    if startTime is not None and not isinstance(startTime, str):
        raise PySparkTypeError(
            errorClass="NOT_STR",
            messageParameters={"arg_name": "startTime", "arg_type": type(startTime).__name__},
        )

    time_col = _to_col(timeColumn)

    if slideDuration is not None and startTime is not None:
        return _invoke_function(
            "window", time_col, lit(windowDuration), lit(slideDuration), lit(startTime)
        )
    elif slideDuration is not None:
        return _invoke_function("window", time_col, lit(windowDuration), lit(slideDuration))
    elif startTime is not None:
        return _invoke_function(
            "window", time_col, lit(windowDuration), lit(windowDuration), lit(startTime)
        )
    else:
        return _invoke_function("window", time_col, lit(windowDuration))


window.__doc__ = pysparkfuncs.window.__doc__


def window_time(
    windowColumn: "ColumnOrName",
) -> Column:
    return _invoke_function("window_time", _to_col(windowColumn))


window_time.__doc__ = pysparkfuncs.window_time.__doc__


def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) -> Column:
    gapDuration = _enum_to_value(gapDuration)
    if gapDuration is None or not isinstance(gapDuration, (Column, str)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "gapDuration", "arg_type": type(gapDuration).__name__},
        )

    time_col = _to_col(timeColumn)

    if isinstance(gapDuration, Column):
        return _invoke_function("session_window", time_col, gapDuration)
    else:
        return _invoke_function("session_window", time_col, lit(gapDuration))


session_window.__doc__ = pysparkfuncs.session_window.__doc__


def to_unix_timestamp(
    timestamp: "ColumnOrName",
    format: Optional["ColumnOrName"] = None,
) -> Column:
    if format is not None:
        return _invoke_function_over_columns("to_unix_timestamp", timestamp, format)
    else:
        return _invoke_function_over_columns("to_unix_timestamp", timestamp)


to_unix_timestamp.__doc__ = pysparkfuncs.to_unix_timestamp.__doc__


def to_timestamp_ltz(
    timestamp: "ColumnOrName",
    format: Optional["ColumnOrName"] = None,
) -> Column:
    if format is not None:
        return _invoke_function_over_columns("to_timestamp_ltz", timestamp, format)
    else:
        return _invoke_function_over_columns("to_timestamp_ltz", timestamp)


to_timestamp_ltz.__doc__ = pysparkfuncs.to_timestamp_ltz.__doc__


def to_timestamp_ntz(
    timestamp: "ColumnOrName",
    format: Optional["ColumnOrName"] = None,
) -> Column:
    if format is not None:
        return _invoke_function_over_columns("to_timestamp_ntz", timestamp, format)
    else:
        return _invoke_function_over_columns("to_timestamp_ntz", timestamp)


to_timestamp_ntz.__doc__ = pysparkfuncs.to_timestamp_ntz.__doc__


# Partition Transformation Functions


def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 4.0.0, use partitioning.bucket instead.", FutureWarning)
    from pyspark.sql.connect.functions import partitioning

    return partitioning.bucket(numBuckets, col)


bucket.__doc__ = pysparkfuncs.bucket.__doc__


def years(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 4.0.0, use partitioning.years instead.", FutureWarning)
    from pyspark.sql.connect.functions import partitioning

    return partitioning.years(col)


years.__doc__ = pysparkfuncs.years.__doc__


def months(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 4.0.0, use partitioning.months instead.", FutureWarning)
    from pyspark.sql.connect.functions import partitioning

    return partitioning.months(col)


months.__doc__ = pysparkfuncs.months.__doc__


def days(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 4.0.0, use partitioning.days instead.", FutureWarning)
    from pyspark.sql.connect.functions import partitioning

    return partitioning.days(col)


days.__doc__ = pysparkfuncs.days.__doc__


def hours(col: "ColumnOrName") -> Column:
    warnings.warn("Deprecated in 4.0.0, use partitioning.hours instead.", FutureWarning)
    from pyspark.sql.connect.functions import partitioning

    return partitioning.hours(col)


hours.__doc__ = pysparkfuncs.hours.__doc__


def convert_timezone(
    sourceTz: Optional[Column], targetTz: Column, sourceTs: "ColumnOrName"
) -> Column:
    if sourceTz is None:
        return _invoke_function_over_columns("convert_timezone", targetTz, sourceTs)
    else:
        return _invoke_function_over_columns("convert_timezone", sourceTz, targetTz, sourceTs)


convert_timezone.__doc__ = pysparkfuncs.convert_timezone.__doc__


def make_dt_interval(
    days: Optional["ColumnOrName"] = None,
    hours: Optional["ColumnOrName"] = None,
    mins: Optional["ColumnOrName"] = None,
    secs: Optional["ColumnOrName"] = None,
) -> Column:
    _days = lit(0) if days is None else _to_col(days)
    _hours = lit(0) if hours is None else _to_col(hours)
    _mins = lit(0) if mins is None else _to_col(mins)
    _secs = lit(decimal.Decimal(0)) if secs is None else _to_col(secs)

    return _invoke_function_over_columns("make_dt_interval", _days, _hours, _mins, _secs)


make_dt_interval.__doc__ = pysparkfuncs.make_dt_interval.__doc__


def try_make_interval(
    years: Optional["ColumnOrName"] = None,
    months: Optional["ColumnOrName"] = None,
    weeks: Optional["ColumnOrName"] = None,
    days: Optional["ColumnOrName"] = None,
    hours: Optional["ColumnOrName"] = None,
    mins: Optional["ColumnOrName"] = None,
    secs: Optional["ColumnOrName"] = None,
) -> Column:
    _years = lit(0) if years is None else _to_col(years)
    _months = lit(0) if months is None else _to_col(months)
    _weeks = lit(0) if weeks is None else _to_col(weeks)
    _days = lit(0) if days is None else _to_col(days)
    _hours = lit(0) if hours is None else _to_col(hours)
    _mins = lit(0) if mins is None else _to_col(mins)
    _secs = lit(decimal.Decimal(0)) if secs is None else _to_col(secs)

    return _invoke_function_over_columns(
        "try_make_interval", _years, _months, _weeks, _days, _hours, _mins, _secs
    )


try_make_interval.__doc__ = pysparkfuncs.try_make_interval.__doc__


def make_interval(
    years: Optional["ColumnOrName"] = None,
    months: Optional["ColumnOrName"] = None,
    weeks: Optional["ColumnOrName"] = None,
    days: Optional["ColumnOrName"] = None,
    hours: Optional["ColumnOrName"] = None,
    mins: Optional["ColumnOrName"] = None,
    secs: Optional["ColumnOrName"] = None,
) -> Column:
    _years = lit(0) if years is None else _to_col(years)
    _months = lit(0) if months is None else _to_col(months)
    _weeks = lit(0) if weeks is None else _to_col(weeks)
    _days = lit(0) if days is None else _to_col(days)
    _hours = lit(0) if hours is None else _to_col(hours)
    _mins = lit(0) if mins is None else _to_col(mins)
    _secs = lit(decimal.Decimal(0)) if secs is None else _to_col(secs)

    return _invoke_function_over_columns(
        "make_interval", _years, _months, _weeks, _days, _hours, _mins, _secs
    )


make_interval.__doc__ = pysparkfuncs.make_interval.__doc__


def make_timestamp(
    years: "ColumnOrName",
    months: "ColumnOrName",
    days: "ColumnOrName",
    hours: "ColumnOrName",
    mins: "ColumnOrName",
    secs: "ColumnOrName",
    timezone: Optional["ColumnOrName"] = None,
) -> Column:
    if timezone is not None:
        return _invoke_function_over_columns(
            "make_timestamp", years, months, days, hours, mins, secs, timezone
        )
    else:
        return _invoke_function_over_columns(
            "make_timestamp", years, months, days, hours, mins, secs
        )


make_timestamp.__doc__ = pysparkfuncs.make_timestamp.__doc__


def try_make_timestamp(
    years: "ColumnOrName",
    months: "ColumnOrName",
    days: "ColumnOrName",
    hours: "ColumnOrName",
    mins: "ColumnOrName",
    secs: "ColumnOrName",
    timezone: Optional["ColumnOrName"] = None,
) -> Column:
    if timezone is not None:
        return _invoke_function_over_columns(
            "try_make_timestamp", years, months, days, hours, mins, secs, timezone
        )
    else:
        return _invoke_function_over_columns(
            "try_make_timestamp", years, months, days, hours, mins, secs
        )


try_make_timestamp.__doc__ = pysparkfuncs.try_make_timestamp.__doc__


def make_timestamp_ltz(
    years: "ColumnOrName",
    months: "ColumnOrName",
    days: "ColumnOrName",
    hours: "ColumnOrName",
    mins: "ColumnOrName",
    secs: "ColumnOrName",
    timezone: Optional["ColumnOrName"] = None,
) -> Column:
    if timezone is not None:
        return _invoke_function_over_columns(
            "make_timestamp_ltz", years, months, days, hours, mins, secs, timezone
        )
    else:
        return _invoke_function_over_columns(
            "make_timestamp_ltz", years, months, days, hours, mins, secs
        )


make_timestamp_ltz.__doc__ = pysparkfuncs.make_timestamp_ltz.__doc__


def try_make_timestamp_ltz(
    years: "ColumnOrName",
    months: "ColumnOrName",
    days: "ColumnOrName",
    hours: "ColumnOrName",
    mins: "ColumnOrName",
    secs: "ColumnOrName",
    timezone: Optional["ColumnOrName"] = None,
) -> Column:
    if timezone is not None:
        return _invoke_function_over_columns(
            "try_make_timestamp_ltz", years, months, days, hours, mins, secs, timezone
        )
    else:
        return _invoke_function_over_columns(
            "try_make_timestamp_ltz", years, months, days, hours, mins, secs
        )


try_make_timestamp_ltz.__doc__ = pysparkfuncs.try_make_timestamp_ltz.__doc__


def make_timestamp_ntz(
    years: "ColumnOrName",
    months: "ColumnOrName",
    days: "ColumnOrName",
    hours: "ColumnOrName",
    mins: "ColumnOrName",
    secs: "ColumnOrName",
) -> Column:
    return _invoke_function_over_columns(
        "make_timestamp_ntz", years, months, days, hours, mins, secs
    )


make_timestamp_ntz.__doc__ = pysparkfuncs.make_timestamp_ntz.__doc__


def try_make_timestamp_ntz(
    years: "ColumnOrName",
    months: "ColumnOrName",
    days: "ColumnOrName",
    hours: "ColumnOrName",
    mins: "ColumnOrName",
    secs: "ColumnOrName",
) -> Column:
    return _invoke_function_over_columns(
        "try_make_timestamp_ntz", years, months, days, hours, mins, secs
    )


try_make_timestamp_ntz.__doc__ = pysparkfuncs.try_make_timestamp_ntz.__doc__


def make_ym_interval(
    years: Optional["ColumnOrName"] = None,
    months: Optional["ColumnOrName"] = None,
) -> Column:
    _years = lit(0) if years is None else _to_col(years)
    _months = lit(0) if months is None else _to_col(months)
    return _invoke_function_over_columns("make_ym_interval", _years, _months)


make_ym_interval.__doc__ = pysparkfuncs.make_ym_interval.__doc__

# Misc Functions


def current_catalog() -> Column:
    return _invoke_function("current_catalog")


current_catalog.__doc__ = pysparkfuncs.current_catalog.__doc__


def current_database() -> Column:
    return _invoke_function("current_database")


current_database.__doc__ = pysparkfuncs.current_database.__doc__


def current_schema() -> Column:
    return _invoke_function("current_schema")


current_schema.__doc__ = pysparkfuncs.current_schema.__doc__


def current_user() -> Column:
    return _invoke_function("current_user")


current_user.__doc__ = pysparkfuncs.current_user.__doc__


def user() -> Column:
    return _invoke_function("user")


user.__doc__ = pysparkfuncs.user.__doc__


def session_user() -> Column:
    return _invoke_function("session_user")


session_user.__doc__ = pysparkfuncs.session_user.__doc__


def uuid() -> Column:
    return _invoke_function("uuid", lit(py_random.randint(0, sys.maxsize)))


def assert_true(col: "ColumnOrName", errMsg: Optional[Union[Column, str]] = None) -> Column:
    errMsg = _enum_to_value(errMsg)
    if errMsg is None:
        return _invoke_function_over_columns("assert_true", col)
    if not isinstance(errMsg, (str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "errMsg", "arg_type": type(errMsg).__name__},
        )
    return _invoke_function_over_columns("assert_true", col, lit(errMsg))


assert_true.__doc__ = pysparkfuncs.assert_true.__doc__


def raise_error(errMsg: Union[Column, str]) -> Column:
    errMsg = _enum_to_value(errMsg)
    if not isinstance(errMsg, (str, Column)):
        raise PySparkTypeError(
            errorClass="NOT_COLUMN_OR_STR",
            messageParameters={"arg_name": "errMsg", "arg_type": type(errMsg).__name__},
        )
    return _invoke_function_over_columns("raise_error", lit(errMsg))


raise_error.__doc__ = pysparkfuncs.raise_error.__doc__


def crc32(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("crc32", col)


crc32.__doc__ = pysparkfuncs.crc32.__doc__


def hash(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("hash", *cols)


hash.__doc__ = pysparkfuncs.hash.__doc__


def xxhash64(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("xxhash64", *cols)


xxhash64.__doc__ = pysparkfuncs.xxhash64.__doc__


def md5(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("md5", col)


md5.__doc__ = pysparkfuncs.md5.__doc__


def sha1(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sha1", col)


sha1.__doc__ = pysparkfuncs.sha1.__doc__


def sha2(col: "ColumnOrName", numBits: int) -> Column:
    if numBits not in [0, 224, 256, 384, 512]:
        raise PySparkValueError(
            errorClass="VALUE_NOT_ALLOWED",
            messageParameters={
                "arg_name": "numBits",
                "allowed_values": "[0, 224, 256, 384, 512]",
            },
        )
    return _invoke_function("sha2", _to_col(col), lit(numBits))


sha2.__doc__ = pysparkfuncs.sha2.__doc__


def hll_sketch_agg(
    col: "ColumnOrName",
    lgConfigK: Optional[Union[int, Column]] = None,
) -> Column:
    if lgConfigK is None:
        return _invoke_function_over_columns("hll_sketch_agg", col)
    else:
        return _invoke_function_over_columns("hll_sketch_agg", col, lit(lgConfigK))


hll_sketch_agg.__doc__ = pysparkfuncs.hll_sketch_agg.__doc__


def hll_union_agg(
    col: "ColumnOrName",
    allowDifferentLgConfigK: Optional[Union[bool, Column]] = None,
) -> Column:
    if allowDifferentLgConfigK is None:
        return _invoke_function_over_columns("hll_union_agg", col)
    else:
        return _invoke_function_over_columns("hll_union_agg", col, lit(allowDifferentLgConfigK))


hll_union_agg.__doc__ = pysparkfuncs.hll_union_agg.__doc__


def hll_sketch_estimate(col: "ColumnOrName") -> Column:
    return _invoke_function("hll_sketch_estimate", _to_col(col))


hll_sketch_estimate.__doc__ = pysparkfuncs.hll_sketch_estimate.__doc__


def hll_union(
    col1: "ColumnOrName", col2: "ColumnOrName", allowDifferentLgConfigK: Optional[bool] = None
) -> Column:
    if allowDifferentLgConfigK is not None:
        return _invoke_function(
            "hll_union", _to_col(col1), _to_col(col2), lit(allowDifferentLgConfigK)
        )
    else:
        return _invoke_function("hll_union", _to_col(col1), _to_col(col2))


hll_union.__doc__ = pysparkfuncs.hll_union.__doc__


# Predicates Function


def ifnull(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("ifnull", col1, col2)


ifnull.__doc__ = pysparkfuncs.ifnull.__doc__


def isnotnull(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("isnotnull", col)


isnotnull.__doc__ = pysparkfuncs.isnotnull.__doc__


def equal_null(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("equal_null", col1, col2)


equal_null.__doc__ = pysparkfuncs.equal_null.__doc__


def nullif(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("nullif", col1, col2)


nullif.__doc__ = pysparkfuncs.nullif.__doc__


def nullifzero(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("nullifzero", col)


nullifzero.__doc__ = pysparkfuncs.nullifzero.__doc__


def nvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("nvl", col1, col2)


nvl.__doc__ = pysparkfuncs.nvl.__doc__


def nvl2(col1: "ColumnOrName", col2: "ColumnOrName", col3: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("nvl2", col1, col2, col3)


nvl2.__doc__ = pysparkfuncs.nvl2.__doc__


def zeroifnull(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("zeroifnull", col)


zeroifnull.__doc__ = pysparkfuncs.zeroifnull.__doc__


def aes_encrypt(
    input: "ColumnOrName",
    key: "ColumnOrName",
    mode: Optional["ColumnOrName"] = None,
    padding: Optional["ColumnOrName"] = None,
    iv: Optional["ColumnOrName"] = None,
    aad: Optional["ColumnOrName"] = None,
) -> Column:
    _mode = lit("GCM") if mode is None else _to_col(mode)
    _padding = lit("DEFAULT") if padding is None else _to_col(padding)
    _iv = lit("") if iv is None else _to_col(iv)
    _aad = lit("") if aad is None else _to_col(aad)

    return _invoke_function_over_columns("aes_encrypt", input, key, _mode, _padding, _iv, _aad)


aes_encrypt.__doc__ = pysparkfuncs.aes_encrypt.__doc__


def aes_decrypt(
    input: "ColumnOrName",
    key: "ColumnOrName",
    mode: Optional["ColumnOrName"] = None,
    padding: Optional["ColumnOrName"] = None,
    aad: Optional["ColumnOrName"] = None,
) -> Column:
    _mode = lit("GCM") if mode is None else _to_col(mode)
    _padding = lit("DEFAULT") if padding is None else _to_col(padding)
    _aad = lit("") if aad is None else _to_col(aad)

    return _invoke_function_over_columns("aes_decrypt", input, key, _mode, _padding, _aad)


aes_decrypt.__doc__ = pysparkfuncs.aes_decrypt.__doc__


def try_aes_decrypt(
    input: "ColumnOrName",
    key: "ColumnOrName",
    mode: Optional["ColumnOrName"] = None,
    padding: Optional["ColumnOrName"] = None,
    aad: Optional["ColumnOrName"] = None,
) -> Column:
    _mode = lit("GCM") if mode is None else _to_col(mode)
    _padding = lit("DEFAULT") if padding is None else _to_col(padding)
    _aad = lit("") if aad is None else _to_col(aad)

    return _invoke_function_over_columns("try_aes_decrypt", input, key, _mode, _padding, _aad)


try_aes_decrypt.__doc__ = pysparkfuncs.try_aes_decrypt.__doc__


def sha(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("sha", col)


sha.__doc__ = pysparkfuncs.sha.__doc__


def input_file_block_length() -> Column:
    return _invoke_function_over_columns("input_file_block_length")


input_file_block_length.__doc__ = pysparkfuncs.input_file_block_length.__doc__


def input_file_block_start() -> Column:
    return _invoke_function_over_columns("input_file_block_start")


input_file_block_start.__doc__ = pysparkfuncs.input_file_block_start.__doc__


def reflect(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("reflect", *cols)


reflect.__doc__ = pysparkfuncs.reflect.__doc__


def java_method(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("java_method", *cols)


java_method.__doc__ = pysparkfuncs.java_method.__doc__


def try_reflect(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("try_reflect", *cols)


try_reflect.__doc__ = pysparkfuncs.try_reflect.__doc__


def version() -> Column:
    return _invoke_function_over_columns("version")


version.__doc__ = pysparkfuncs.version.__doc__


def typeof(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("typeof", col)


typeof.__doc__ = pysparkfuncs.typeof.__doc__


def stack(*cols: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("stack", *cols)


stack.__doc__ = pysparkfuncs.stack.__doc__


def bitmap_bit_position(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bitmap_bit_position", col)


bitmap_bit_position.__doc__ = pysparkfuncs.bitmap_bit_position.__doc__


def bitmap_bucket_number(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bitmap_bucket_number", col)


bitmap_bucket_number.__doc__ = pysparkfuncs.bitmap_bucket_number.__doc__


def bitmap_construct_agg(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bitmap_construct_agg", col)


bitmap_construct_agg.__doc__ = pysparkfuncs.bitmap_construct_agg.__doc__


def bitmap_count(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bitmap_count", col)


bitmap_count.__doc__ = pysparkfuncs.bitmap_count.__doc__


def bitmap_or_agg(col: "ColumnOrName") -> Column:
    return _invoke_function_over_columns("bitmap_or_agg", col)


bitmap_or_agg.__doc__ = pysparkfuncs.bitmap_or_agg.__doc__


# Call Functions


def call_udf(udfName: str, *cols: "ColumnOrName") -> Column:
    return _invoke_function(udfName, *[_to_col(c) for c in cols])


call_udf.__doc__ = pysparkfuncs.call_udf.__doc__


def unwrap_udt(col: "ColumnOrName") -> Column:
    return _invoke_function("unwrap_udt", _to_col(col))


unwrap_udt.__doc__ = pysparkfuncs.unwrap_udt.__doc__


def udf(
    f: Optional[Union[Callable[..., Any], "DataTypeOrString"]] = None,
    returnType: "DataTypeOrString" = StringType(),
    *,
    useArrow: Optional[bool] = None,
) -> Union["UserDefinedFunctionLike", Callable[[Callable[..., Any]], "UserDefinedFunctionLike"]]:
    if f is None or isinstance(f, (str, DataType)):
        # If DataType has been passed as a positional argument
        # for decorator use it as a returnType
        return_type = f or returnType
        return functools.partial(
            _create_py_udf,
            returnType=return_type,
            useArrow=useArrow,
        )
    else:
        return _create_py_udf(f=f, returnType=returnType, useArrow=useArrow)


udf.__doc__ = pysparkfuncs.udf.__doc__


def udtf(
    cls: Optional[Type] = None,
    *,
    returnType: Optional[Union[StructType, str]] = None,
    useArrow: Optional[bool] = None,
) -> Union["UserDefinedTableFunction", Callable[[Type], "UserDefinedTableFunction"]]:
    if cls is None:
        return functools.partial(_create_py_udtf, returnType=returnType, useArrow=useArrow)
    else:
        return _create_py_udtf(cls=cls, returnType=returnType, useArrow=useArrow)


udtf.__doc__ = pysparkfuncs.udtf.__doc__


def call_function(funcName: str, *cols: "ColumnOrName") -> Column:
    from pyspark.sql.connect.column import Column as ConnectColumn

    expressions = [_to_col(c)._expr for c in cols]
    return ConnectColumn(CallFunction(funcName, expressions))  # type: ignore[arg-type]


call_function.__doc__ = pysparkfuncs.call_function.__doc__


def _test() -> None:
    import sys
    import os
    import doctest
    from pyspark.sql import SparkSession as PySparkSession
    import pyspark.sql.connect.functions.builtin

    globs = pyspark.sql.connect.functions.builtin.__dict__.copy()

    globs["spark"] = (
        PySparkSession.builder.appName("sql.connect.functions tests")
        .remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
        .getOrCreate()
    )

    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.connect.functions.builtin,
        globs=globs,
        optionflags=doctest.ELLIPSIS
        | doctest.NORMALIZE_WHITESPACE
        | doctest.IGNORE_EXCEPTION_DETAIL,
    )

    globs["spark"].stop()

    if failure_count:
        sys.exit(-1)


if __name__ == "__main__":
    _test()
