python/pyspark/sql/connect/functions.py (2,070 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
import decimal
import inspect
import warnings
import functools
from typing import (
Any,
Dict,
TYPE_CHECKING,
Union,
List,
overload,
Optional,
Tuple,
Type,
Callable,
ValuesView,
cast,
)
import numpy as np
from pyspark.errors import PySparkTypeError, PySparkValueError
from pyspark.sql.connect.column import Column
from pyspark.sql.connect.expressions import (
CaseWhen,
Expression,
LiteralExpression,
ColumnReference,
UnresolvedFunction,
UnresolvedStar,
SQLExpression,
LambdaFunction,
UnresolvedNamedLambdaVariable,
CallFunction,
)
from pyspark.sql.connect.udf import _create_py_udf
from pyspark.sql.connect.udtf import AnalyzeArgument, AnalyzeResult # noqa: F401
from pyspark.sql.connect.udtf import _create_py_udtf
from pyspark.sql import functions as pysparkfuncs
from pyspark.sql.types import _from_numpy_type, DataType, StructType, ArrayType, StringType
# The implementation of pandas_udf is embedded in pyspark.sql.function.pandas_udf
# for code reuse.
from pyspark.sql.functions import pandas_udf # noqa: F401
if TYPE_CHECKING:
from pyspark.sql.connect._typing import (
ColumnOrName,
DataTypeOrString,
UserDefinedFunctionLike,
)
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.udtf import UserDefinedTableFunction
def _to_col_with_plan_id(col: str, plan_id: Optional[int]) -> Column:
if col == "*":
return Column(UnresolvedStar(unparsed_target=None))
elif col.endswith(".*"):
return Column(UnresolvedStar(unparsed_target=col))
else:
return Column(ColumnReference(unparsed_identifier=col, plan_id=plan_id))
def _to_col(col: "ColumnOrName") -> Column:
assert isinstance(col, (Column, str))
return col if isinstance(col, Column) else column(col)
def _invoke_function(name: str, *args: Union[Column, Expression]) -> Column:
"""
Simple wrapper function that converts the arguments into the appropriate types.
Parameters
----------
name Name of the function to be called.
args The list of arguments.
Returns
-------
:class:`Column`
"""
expressions: List[Expression] = []
for arg in args:
assert isinstance(arg, (Column, Expression))
if isinstance(arg, Column):
expressions.append(arg._expr)
else:
expressions.append(arg)
return Column(UnresolvedFunction(name, expressions))
def _invoke_function_over_columns(name: str, *cols: "ColumnOrName") -> Column:
"""
Invokes n-ary function identified by name
and wraps the result with :class:`~pyspark.sql.Column`.
"""
_cols = [_to_col(c) for c in cols]
return _invoke_function(name, *_cols)
def _invoke_binary_math_function(name: str, col1: Any, col2: Any) -> Column:
"""
Invokes binary math function identified by name
and wraps the result with :class:`~pyspark.sql.Column`.
"""
# For legacy reasons, the arguments here can be implicitly converted into column
_cols = [_to_col(c) if isinstance(c, (str, Column)) else lit(c) for c in (col1, col2)]
return _invoke_function(name, *_cols)
def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
signature = inspect.signature(f)
parameters = signature.parameters.values()
# We should exclude functions that use, variable args and keyword argument
# names, as well as keyword only args.
supported_parameter_types = {
inspect.Parameter.POSITIONAL_OR_KEYWORD,
inspect.Parameter.POSITIONAL_ONLY,
}
# Validate that the function arity is between 1 and 3.
if not (1 <= len(parameters) <= 3):
raise PySparkValueError(
error_class="WRONG_NUM_ARGS_FOR_HIGHER_ORDER_FUNCTION",
message_parameters={"func_name": f.__name__, "num_args": str(len(parameters))},
)
# Verify that all arguments can be used as positional arguments.
if not all(p.kind in supported_parameter_types for p in parameters):
raise PySparkValueError(
error_class="UNSUPPORTED_PARAM_TYPE_FOR_HIGHER_ORDER_FUNCTION",
message_parameters={"func_name": f.__name__},
)
return parameters
def _create_lambda(f: Callable) -> LambdaFunction:
"""
Create `o.a.s.sql.expressions.LambdaFunction` corresponding
to transformation described by f
:param f: A Python of one of the following forms:
- (Column) -> Column: ...
- (Column, Column) -> Column: ...
- (Column, Column, Column) -> Column: ...
"""
parameters = _get_lambda_parameters(f)
arg_names = ["x", "y", "z"][: len(parameters)]
arg_exprs = [
UnresolvedNamedLambdaVariable([UnresolvedNamedLambdaVariable.fresh_var_name(arg_name)])
for arg_name in arg_names
]
arg_cols = [Column(arg_expr) for arg_expr in arg_exprs]
result = f(*arg_cols)
if not isinstance(result, Column):
raise PySparkValueError(
error_class="HIGHER_ORDER_FUNCTION_SHOULD_RETURN_COLUMN",
message_parameters={"func_name": f.__name__, "return_type": type(result).__name__},
)
return LambdaFunction(result._expr, arg_exprs)
def _invoke_higher_order_function(
name: str,
cols: List["ColumnOrName"],
funs: List[Callable],
) -> Column:
"""
Invokes expression identified by name,
(relative to ```org.apache.spark.sql.catalyst.expressions``)
and wraps the result with Column (first Scala one, then Python).
:param name: Name of the expression
:param cols: a list of columns
:param funs: a list of (*Column) -> Column functions.
:return: a Column
"""
_cols = [_to_col(c) for c in cols]
_funs = [_create_lambda(f) for f in funs]
return _invoke_function(name, *_cols, *_funs)
def _options_to_col(options: Dict[str, Any]) -> Column:
_options: List[Column] = []
for k, v in options.items():
_options.append(lit(str(k)))
_options.append(lit(str(v)))
return create_map(*_options)
# Normal Functions
def col(col: str) -> Column:
return _to_col_with_plan_id(col=col, plan_id=None)
col.__doc__ = pysparkfuncs.col.__doc__
column = col
def lit(col: Any) -> Column:
if isinstance(col, Column):
return col
elif isinstance(col, list):
if any(isinstance(c, Column) for c in col):
raise PySparkValueError(
error_class="COLUMN_IN_LIST", message_parameters={"func_name": "lit"}
)
return array(*[lit(c) for c in col])
elif isinstance(col, np.ndarray) and col.ndim == 1:
if _from_numpy_type(col.dtype) is None:
raise PySparkTypeError(
error_class="UNSUPPORTED_NUMPY_ARRAY_SCALAR",
message_parameters={"dtype": col.dtype.name},
)
# NumpyArrayConverter for Py4J can not support ndarray with int8 values.
# Actually this is not a problem for Connect, but here still convert it
# to int16 for compatibility.
if col.dtype == np.int8:
col = col.astype(np.int16)
return array(*[lit(c) for c in col])
else:
return Column(LiteralExpression._from_value(col))
lit.__doc__ = pysparkfuncs.lit.__doc__
def bitwiseNOT(col: "ColumnOrName") -> Column:
warnings.warn("Deprecated in 3.4, use bitwise_not instead.", FutureWarning)
return bitwise_not(col)
bitwiseNOT.__doc__ = pysparkfuncs.bitwiseNOT.__doc__
def bitwise_not(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("~", col)
bitwise_not.__doc__ = pysparkfuncs.bitwise_not.__doc__
def bit_count(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bit_count", col)
bit_count.__doc__ = pysparkfuncs.bit_count.__doc__
def bit_get(col: "ColumnOrName", pos: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bit_get", col, pos)
bit_get.__doc__ = pysparkfuncs.bit_get.__doc__
def getbit(col: "ColumnOrName", pos: "ColumnOrName") -> Column:
return _invoke_function_over_columns("getbit", col, pos)
getbit.__doc__ = pysparkfuncs.getbit.__doc__
def broadcast(df: "DataFrame") -> "DataFrame":
from pyspark.sql.connect.dataframe import DataFrame
if not isinstance(df, DataFrame):
raise PySparkTypeError(
error_class="NOT_DATAFRAME",
message_parameters={"arg_name": "df", "arg_type": type(df).__name__},
)
return df.hint("broadcast")
broadcast.__doc__ = pysparkfuncs.broadcast.__doc__
def coalesce(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("coalesce", *cols)
coalesce.__doc__ = pysparkfuncs.coalesce.__doc__
def expr(str: str) -> Column:
return Column(SQLExpression(str))
expr.__doc__ = pysparkfuncs.expr.__doc__
def greatest(*cols: "ColumnOrName") -> Column:
if len(cols) < 2:
raise PySparkValueError(
error_class="WRONG_NUM_COLUMNS",
message_parameters={"func_name": "greatest", "num_cols": "2"},
)
return _invoke_function_over_columns("greatest", *cols)
greatest.__doc__ = pysparkfuncs.greatest.__doc__
def input_file_name() -> Column:
return _invoke_function("input_file_name")
input_file_name.__doc__ = pysparkfuncs.input_file_name.__doc__
def least(*cols: "ColumnOrName") -> Column:
if len(cols) < 2:
raise PySparkValueError(
error_class="WRONG_NUM_COLUMNS",
message_parameters={"func_name": "least", "num_cols": "2"},
)
return _invoke_function_over_columns("least", *cols)
least.__doc__ = pysparkfuncs.least.__doc__
def isnan(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("isnan", col)
isnan.__doc__ = pysparkfuncs.isnan.__doc__
def isnull(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("isnull", col)
isnull.__doc__ = pysparkfuncs.isnull.__doc__
def monotonically_increasing_id() -> Column:
return _invoke_function("monotonically_increasing_id")
monotonically_increasing_id.__doc__ = pysparkfuncs.monotonically_increasing_id.__doc__
def nanvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("nanvl", col1, col2)
nanvl.__doc__ = pysparkfuncs.nanvl.__doc__
def rand(seed: Optional[int] = None) -> Column:
if seed is not None:
return _invoke_function("rand", lit(seed))
else:
return _invoke_function("rand")
rand.__doc__ = pysparkfuncs.rand.__doc__
def randn(seed: Optional[int] = None) -> Column:
if seed is not None:
return _invoke_function("randn", lit(seed))
else:
return _invoke_function("randn")
randn.__doc__ = pysparkfuncs.randn.__doc__
def spark_partition_id() -> Column:
return _invoke_function("spark_partition_id")
spark_partition_id.__doc__ = pysparkfuncs.spark_partition_id.__doc__
def when(condition: Column, value: Any) -> Column:
# Explicitly not using ColumnOrName type here to make reading condition less opaque
if not isinstance(condition, Column):
raise PySparkTypeError(
error_class="NOT_COLUMN",
message_parameters={"arg_name": "condition", "arg_type": type(condition).__name__},
)
value_col = value if isinstance(value, Column) else lit(value)
return Column(CaseWhen(branches=[(condition._expr, value_col._expr)], else_value=None))
when.__doc__ = pysparkfuncs.when.__doc__
# Sort Functions
def asc(col: "ColumnOrName") -> Column:
return _to_col(col).asc()
asc.__doc__ = pysparkfuncs.asc.__doc__
def asc_nulls_first(col: "ColumnOrName") -> Column:
return _to_col(col).asc_nulls_first()
asc_nulls_first.__doc__ = pysparkfuncs.asc_nulls_first.__doc__
def asc_nulls_last(col: "ColumnOrName") -> Column:
return _to_col(col).asc_nulls_last()
asc_nulls_last.__doc__ = pysparkfuncs.asc_nulls_last.__doc__
def desc(col: "ColumnOrName") -> Column:
return _to_col(col).desc()
desc.__doc__ = pysparkfuncs.desc.__doc__
def desc_nulls_first(col: "ColumnOrName") -> Column:
return _to_col(col).desc_nulls_first()
desc_nulls_first.__doc__ = pysparkfuncs.desc_nulls_first.__doc__
def desc_nulls_last(col: "ColumnOrName") -> Column:
return _to_col(col).desc_nulls_last()
desc_nulls_last.__doc__ = pysparkfuncs.desc_nulls_last.__doc__
# Math Functions
def abs(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("abs", col)
abs.__doc__ = pysparkfuncs.abs.__doc__
def acos(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("acos", col)
acos.__doc__ = pysparkfuncs.acos.__doc__
def acosh(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("acosh", col)
acosh.__doc__ = pysparkfuncs.acosh.__doc__
def asin(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("asin", col)
asin.__doc__ = pysparkfuncs.asin.__doc__
def asinh(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("asinh", col)
asinh.__doc__ = pysparkfuncs.asinh.__doc__
def atan(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("atan", col)
atan.__doc__ = pysparkfuncs.atan.__doc__
def atan2(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) -> Column:
return _invoke_binary_math_function("atan2", col1, col2)
atan2.__doc__ = pysparkfuncs.atan2.__doc__
def atanh(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("atanh", col)
atanh.__doc__ = pysparkfuncs.atanh.__doc__
def bin(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bin", col)
bin.__doc__ = pysparkfuncs.bin.__doc__
def bround(col: "ColumnOrName", scale: int = 0) -> Column:
return _invoke_function("bround", _to_col(col), lit(scale))
bround.__doc__ = pysparkfuncs.bround.__doc__
def cbrt(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("cbrt", col)
cbrt.__doc__ = pysparkfuncs.cbrt.__doc__
def ceil(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("ceil", col)
ceil.__doc__ = pysparkfuncs.ceil.__doc__
ceiling = ceil
def conv(col: "ColumnOrName", fromBase: int, toBase: int) -> Column:
return _invoke_function("conv", _to_col(col), lit(fromBase), lit(toBase))
conv.__doc__ = pysparkfuncs.conv.__doc__
def cos(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("cos", col)
cos.__doc__ = pysparkfuncs.cos.__doc__
def cosh(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("cosh", col)
cosh.__doc__ = pysparkfuncs.cosh.__doc__
def cot(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("cot", col)
cot.__doc__ = pysparkfuncs.cot.__doc__
def csc(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("csc", col)
csc.__doc__ = pysparkfuncs.csc.__doc__
def degrees(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("degrees", col)
degrees.__doc__ = pysparkfuncs.degrees.__doc__
def e() -> Column:
return _invoke_function("e")
e.__doc__ = pysparkfuncs.e.__doc__
def exp(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("exp", col)
exp.__doc__ = pysparkfuncs.exp.__doc__
def expm1(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("expm1", col)
expm1.__doc__ = pysparkfuncs.expm1.__doc__
def factorial(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("factorial", col)
factorial.__doc__ = pysparkfuncs.factorial.__doc__
def floor(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("floor", col)
floor.__doc__ = pysparkfuncs.floor.__doc__
def hex(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("hex", col)
hex.__doc__ = pysparkfuncs.hex.__doc__
def hypot(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) -> Column:
return _invoke_binary_math_function("hypot", col1, col2)
hypot.__doc__ = pysparkfuncs.hypot.__doc__
def log(arg1: Union["ColumnOrName", float], arg2: Optional["ColumnOrName"] = None) -> Column:
if arg2 is None:
# in this case, arg1 should be "ColumnOrName"
return _invoke_function("ln", _to_col(cast("ColumnOrName", arg1)))
else:
# in this case, arg1 should be a float
return _invoke_function("log", lit(cast(float, arg1)), _to_col(arg2))
log.__doc__ = pysparkfuncs.log.__doc__
def log10(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("log10", col)
log10.__doc__ = pysparkfuncs.log10.__doc__
def log1p(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("log1p", col)
log1p.__doc__ = pysparkfuncs.log1p.__doc__
def ln(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("ln", col)
ln.__doc__ = pysparkfuncs.ln.__doc__
def log2(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("log2", col)
log2.__doc__ = pysparkfuncs.log2.__doc__
def negative(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("negative", col)
negative.__doc__ = pysparkfuncs.negative.__doc__
negate = negative
def pi() -> Column:
return _invoke_function("pi")
pi.__doc__ = pysparkfuncs.pi.__doc__
def positive(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("positive", col)
positive.__doc__ = pysparkfuncs.positive.__doc__
def pmod(dividend: Union["ColumnOrName", float], divisor: Union["ColumnOrName", float]) -> Column:
return _invoke_binary_math_function("pmod", dividend, divisor)
pmod.__doc__ = pysparkfuncs.pmod.__doc__
def width_bucket(
v: "ColumnOrName",
min: "ColumnOrName",
max: "ColumnOrName",
numBucket: Union["ColumnOrName", int],
) -> Column:
numBucket = lit(numBucket) if isinstance(numBucket, int) else numBucket
return _invoke_function_over_columns("width_bucket", v, min, max, numBucket)
width_bucket.__doc__ = pysparkfuncs.width_bucket.__doc__
def pow(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float]) -> Column:
return _invoke_binary_math_function("power", col1, col2)
pow.__doc__ = pysparkfuncs.pow.__doc__
def radians(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("radians", col)
radians.__doc__ = pysparkfuncs.radians.__doc__
def rint(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("rint", col)
rint.__doc__ = pysparkfuncs.rint.__doc__
def round(col: "ColumnOrName", scale: int = 0) -> Column:
return _invoke_function("round", _to_col(col), lit(scale))
round.__doc__ = pysparkfuncs.round.__doc__
def sec(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sec", col)
sec.__doc__ = pysparkfuncs.sec.__doc__
def shiftLeft(col: "ColumnOrName", numBits: int) -> Column:
warnings.warn("Deprecated in 3.4, use shiftleft instead.", FutureWarning)
return shiftleft(col, numBits)
shiftLeft.__doc__ = pysparkfuncs.shiftLeft.__doc__
def shiftleft(col: "ColumnOrName", numBits: int) -> Column:
return _invoke_function("shiftleft", _to_col(col), lit(numBits))
shiftleft.__doc__ = pysparkfuncs.shiftleft.__doc__
def shiftRight(col: "ColumnOrName", numBits: int) -> Column:
warnings.warn("Deprecated in 3.4, use shiftright instead.", FutureWarning)
return shiftright(col, numBits)
shiftRight.__doc__ = pysparkfuncs.shiftRight.__doc__
def shiftright(col: "ColumnOrName", numBits: int) -> Column:
return _invoke_function("shiftright", _to_col(col), lit(numBits))
shiftright.__doc__ = pysparkfuncs.shiftright.__doc__
def shiftRightUnsigned(col: "ColumnOrName", numBits: int) -> Column:
warnings.warn("Deprecated in 3.4, use shiftrightunsigned instead.", FutureWarning)
return shiftrightunsigned(col, numBits)
shiftRightUnsigned.__doc__ = pysparkfuncs.shiftRightUnsigned.__doc__
def shiftrightunsigned(col: "ColumnOrName", numBits: int) -> Column:
return _invoke_function("shiftrightunsigned", _to_col(col), lit(numBits))
shiftrightunsigned.__doc__ = pysparkfuncs.shiftrightunsigned.__doc__
def signum(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("signum", col)
signum.__doc__ = pysparkfuncs.signum.__doc__
sigh = signum
def sin(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sin", col)
sin.__doc__ = pysparkfuncs.sin.__doc__
def sinh(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sinh", col)
sinh.__doc__ = pysparkfuncs.sinh.__doc__
def sqrt(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sqrt", col)
sqrt.__doc__ = pysparkfuncs.sqrt.__doc__
def try_add(left: "ColumnOrName", right: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_add", left, right)
try_add.__doc__ = pysparkfuncs.try_add.__doc__
def try_avg(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_avg", col)
try_avg.__doc__ = pysparkfuncs.try_avg.__doc__
def try_divide(left: "ColumnOrName", right: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_divide", left, right)
try_divide.__doc__ = pysparkfuncs.try_divide.__doc__
def try_multiply(left: "ColumnOrName", right: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_multiply", left, right)
try_multiply.__doc__ = pysparkfuncs.try_multiply.__doc__
def try_subtract(left: "ColumnOrName", right: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_subtract", left, right)
try_subtract.__doc__ = pysparkfuncs.try_subtract.__doc__
def try_sum(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_sum", col)
try_sum.__doc__ = pysparkfuncs.try_sum.__doc__
def tan(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("tan", col)
tan.__doc__ = pysparkfuncs.tan.__doc__
def tanh(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("tanh", col)
tanh.__doc__ = pysparkfuncs.tanh.__doc__
def toDegrees(col: "ColumnOrName") -> Column:
warnings.warn("Deprecated in 3.4, use degrees instead.", FutureWarning)
return degrees(col)
toDegrees.__doc__ = pysparkfuncs.toDegrees.__doc__
def toRadians(col: "ColumnOrName") -> Column:
warnings.warn("Deprecated in 3.4, use radians instead.", FutureWarning)
return radians(col)
toRadians.__doc__ = pysparkfuncs.toRadians.__doc__
def unhex(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("unhex", col)
unhex.__doc__ = pysparkfuncs.unhex.__doc__
def approxCountDistinct(col: "ColumnOrName", rsd: Optional[float] = None) -> Column:
warnings.warn("Deprecated in 3.4, use approx_count_distinct instead.", FutureWarning)
return approx_count_distinct(col, rsd)
approxCountDistinct.__doc__ = pysparkfuncs.approxCountDistinct.__doc__
def approx_count_distinct(col: "ColumnOrName", rsd: Optional[float] = None) -> Column:
if rsd is None:
return _invoke_function("approx_count_distinct", _to_col(col))
else:
return _invoke_function("approx_count_distinct", _to_col(col), lit(rsd))
approx_count_distinct.__doc__ = pysparkfuncs.approx_count_distinct.__doc__
def avg(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("avg", col)
avg.__doc__ = pysparkfuncs.avg.__doc__
def collect_list(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("collect_list", col)
collect_list.__doc__ = pysparkfuncs.collect_list.__doc__
def array_agg(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_agg", col)
array_agg.__doc__ = pysparkfuncs.array_agg.__doc__
def collect_set(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("collect_set", col)
collect_set.__doc__ = pysparkfuncs.collect_set.__doc__
def corr(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("corr", col1, col2)
corr.__doc__ = pysparkfuncs.corr.__doc__
def count(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("count", col)
count.__doc__ = pysparkfuncs.count.__doc__
def countDistinct(col: "ColumnOrName", *cols: "ColumnOrName") -> Column:
return count_distinct(col, *cols)
countDistinct.__doc__ = pysparkfuncs.countDistinct.__doc__
def count_distinct(col: "ColumnOrName", *cols: "ColumnOrName") -> Column:
_exprs = [_to_col(c)._expr for c in [col] + list(cols)]
return Column(UnresolvedFunction("count", _exprs, is_distinct=True))
count_distinct.__doc__ = pysparkfuncs.count_distinct.__doc__
def covar_pop(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("covar_pop", col1, col2)
covar_pop.__doc__ = pysparkfuncs.covar_pop.__doc__
def covar_samp(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("covar_samp", col1, col2)
covar_samp.__doc__ = pysparkfuncs.covar_samp.__doc__
def first(col: "ColumnOrName", ignorenulls: bool = False) -> Column:
return _invoke_function("first", _to_col(col), lit(ignorenulls))
first.__doc__ = pysparkfuncs.first.__doc__
def grouping(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("grouping", col)
grouping.__doc__ = pysparkfuncs.grouping.__doc__
def grouping_id(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("grouping_id", *cols)
grouping_id.__doc__ = pysparkfuncs.grouping_id.__doc__
def count_min_sketch(
col: "ColumnOrName",
eps: "ColumnOrName",
confidence: "ColumnOrName",
seed: "ColumnOrName",
) -> Column:
return _invoke_function_over_columns("count_min_sketch", col, eps, confidence, seed)
count_min_sketch.__doc__ = pysparkfuncs.count_min_sketch.__doc__
def kurtosis(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("kurtosis", col)
kurtosis.__doc__ = pysparkfuncs.kurtosis.__doc__
def last(col: "ColumnOrName", ignorenulls: bool = False) -> Column:
return _invoke_function("last", _to_col(col), lit(ignorenulls))
last.__doc__ = pysparkfuncs.last.__doc__
def max(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("max", col)
max.__doc__ = pysparkfuncs.max.__doc__
def max_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column:
return _invoke_function_over_columns("max_by", col, ord)
max_by.__doc__ = pysparkfuncs.max_by.__doc__
def mean(col: "ColumnOrName") -> Column:
return avg(col)
mean.__doc__ = pysparkfuncs.mean.__doc__
def median(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("median", col)
median.__doc__ = pysparkfuncs.median.__doc__
def min(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("min", col)
min.__doc__ = pysparkfuncs.min.__doc__
def min_by(col: "ColumnOrName", ord: "ColumnOrName") -> Column:
return _invoke_function_over_columns("min_by", col, ord)
min_by.__doc__ = pysparkfuncs.min_by.__doc__
def mode(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("mode", col)
mode.__doc__ = pysparkfuncs.mode.__doc__
def percentile(
col: "ColumnOrName",
percentage: Union[Column, float, List[float], Tuple[float]],
frequency: Union[Column, int] = 1,
) -> Column:
if isinstance(percentage, Column):
_percentage = percentage
elif isinstance(percentage, (list, tuple)):
# Convert tuple to list
_percentage = lit(list(percentage))
else:
# Probably scalar
_percentage = lit(percentage)
if isinstance(frequency, int):
_frequency = lit(frequency)
elif isinstance(frequency, Column):
_frequency = frequency
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_INT",
message_parameters={
"arg_name": "frequency",
"arg_type": type(frequency).__name__,
},
)
return _invoke_function("percentile", _to_col(col), _percentage, _frequency)
percentile.__doc__ = pysparkfuncs.percentile.__doc__
def percentile_approx(
col: "ColumnOrName",
percentage: Union[Column, float, List[float], Tuple[float]],
accuracy: Union[Column, float] = 10000,
) -> Column:
if isinstance(percentage, Column):
percentage_col = percentage
elif isinstance(percentage, (list, tuple)):
# Convert tuple to list
percentage_col = lit(list(percentage))
else:
# Probably scalar
percentage_col = lit(percentage)
return _invoke_function("percentile_approx", _to_col(col), percentage_col, lit(accuracy))
percentile_approx.__doc__ = pysparkfuncs.percentile_approx.__doc__
def approx_percentile(
col: "ColumnOrName",
percentage: Union[Column, float, List[float], Tuple[float]],
accuracy: Union[Column, float] = 10000,
) -> Column:
if isinstance(percentage, Column):
percentage_col = percentage
elif isinstance(percentage, (list, tuple)):
# Convert tuple to list
percentage_col = lit(list(percentage))
else:
# Probably scalar
percentage_col = lit(percentage)
return _invoke_function("approx_percentile", _to_col(col), percentage_col, lit(accuracy))
approx_percentile.__doc__ = pysparkfuncs.approx_percentile.__doc__
def product(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("product", col)
product.__doc__ = pysparkfuncs.product.__doc__
def skewness(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("skewness", col)
skewness.__doc__ = pysparkfuncs.skewness.__doc__
def stddev(col: "ColumnOrName") -> Column:
return stddev_samp(col)
stddev.__doc__ = pysparkfuncs.stddev.__doc__
std = stddev
def stddev_samp(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("stddev_samp", col)
stddev_samp.__doc__ = pysparkfuncs.stddev_samp.__doc__
def stddev_pop(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("stddev_pop", col)
stddev_pop.__doc__ = pysparkfuncs.stddev_pop.__doc__
def sum(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sum", col)
sum.__doc__ = pysparkfuncs.sum.__doc__
def sumDistinct(col: "ColumnOrName") -> Column:
warnings.warn("Deprecated in 3.4, use sum_distinct instead.", FutureWarning)
return sum_distinct(col)
sumDistinct.__doc__ = pysparkfuncs.sumDistinct.__doc__
def sum_distinct(col: "ColumnOrName") -> Column:
return Column(UnresolvedFunction("sum", [_to_col(col)._expr], is_distinct=True))
sum_distinct.__doc__ = pysparkfuncs.sum_distinct.__doc__
def var_pop(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("var_pop", col)
var_pop.__doc__ = pysparkfuncs.var_pop.__doc__
def regr_avgx(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_avgx", y, x)
regr_avgx.__doc__ = pysparkfuncs.regr_avgx.__doc__
def regr_avgy(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_avgy", y, x)
regr_avgy.__doc__ = pysparkfuncs.regr_avgy.__doc__
def regr_count(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_count", y, x)
regr_count.__doc__ = pysparkfuncs.regr_count.__doc__
def regr_intercept(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_intercept", y, x)
regr_intercept.__doc__ = pysparkfuncs.regr_intercept.__doc__
def regr_r2(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_r2", y, x)
regr_r2.__doc__ = pysparkfuncs.regr_r2.__doc__
def regr_slope(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_slope", y, x)
regr_slope.__doc__ = pysparkfuncs.regr_slope.__doc__
def regr_sxx(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_sxx", y, x)
regr_sxx.__doc__ = pysparkfuncs.regr_sxx.__doc__
def regr_sxy(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_sxy", y, x)
regr_sxy.__doc__ = pysparkfuncs.regr_sxy.__doc__
def regr_syy(y: "ColumnOrName", x: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regr_syy", y, x)
regr_syy.__doc__ = pysparkfuncs.regr_syy.__doc__
def var_samp(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("var_samp", col)
var_samp.__doc__ = pysparkfuncs.var_samp.__doc__
def variance(col: "ColumnOrName") -> Column:
return var_samp(col)
variance.__doc__ = pysparkfuncs.variance.__doc__
def every(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bool_and", col)
every.__doc__ = pysparkfuncs.every.__doc__
def bool_and(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bool_and", col)
bool_and.__doc__ = pysparkfuncs.bool_and.__doc__
def some(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bool_or", col)
some.__doc__ = pysparkfuncs.some.__doc__
def bool_or(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bool_or", col)
bool_or.__doc__ = pysparkfuncs.bool_or.__doc__
def bit_and(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bit_and", col)
bit_and.__doc__ = pysparkfuncs.bit_and.__doc__
def bit_or(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bit_or", col)
bit_or.__doc__ = pysparkfuncs.bit_or.__doc__
def bit_xor(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bit_xor", col)
bit_xor.__doc__ = pysparkfuncs.bit_xor.__doc__
# Window Functions
def cume_dist() -> Column:
return _invoke_function("cume_dist")
cume_dist.__doc__ = pysparkfuncs.cume_dist.__doc__
def dense_rank() -> Column:
return _invoke_function("dense_rank")
dense_rank.__doc__ = pysparkfuncs.dense_rank.__doc__
def lag(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) -> Column:
if default is None:
return _invoke_function("lag", _to_col(col), lit(offset))
else:
return _invoke_function("lag", _to_col(col), lit(offset), lit(default))
lag.__doc__ = pysparkfuncs.lag.__doc__
def lead(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) -> Column:
if default is None:
return _invoke_function("lead", _to_col(col), lit(offset))
else:
return _invoke_function("lead", _to_col(col), lit(offset), lit(default))
lead.__doc__ = pysparkfuncs.lead.__doc__
def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = None) -> Column:
if ignoreNulls is None:
return _invoke_function("nth_value", _to_col(col), lit(offset))
else:
return _invoke_function("nth_value", _to_col(col), lit(offset), lit(ignoreNulls))
nth_value.__doc__ = pysparkfuncs.nth_value.__doc__
def any_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
if ignoreNulls is None:
return _invoke_function_over_columns("any_value", col)
else:
ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
return _invoke_function_over_columns("any_value", col, ignoreNulls)
any_value.__doc__ = pysparkfuncs.any_value.__doc__
def first_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
if ignoreNulls is None:
return _invoke_function_over_columns("first_value", col)
else:
ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
return _invoke_function_over_columns("first_value", col, ignoreNulls)
first_value.__doc__ = pysparkfuncs.first_value.__doc__
def last_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
if ignoreNulls is None:
return _invoke_function_over_columns("last_value", col)
else:
ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
return _invoke_function_over_columns("last_value", col, ignoreNulls)
last_value.__doc__ = pysparkfuncs.last_value.__doc__
def count_if(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("count_if", col)
count_if.__doc__ = pysparkfuncs.count_if.__doc__
def histogram_numeric(col: "ColumnOrName", nBins: "ColumnOrName") -> Column:
return _invoke_function_over_columns("histogram_numeric", col, nBins)
histogram_numeric.__doc__ = pysparkfuncs.histogram_numeric.__doc__
def ntile(n: int) -> Column:
return _invoke_function("ntile", lit(n))
ntile.__doc__ = pysparkfuncs.ntile.__doc__
def percent_rank() -> Column:
return _invoke_function("percent_rank")
percent_rank.__doc__ = pysparkfuncs.percent_rank.__doc__
def rank() -> Column:
return _invoke_function("rank")
rank.__doc__ = pysparkfuncs.rank.__doc__
def row_number() -> Column:
return _invoke_function("row_number")
row_number.__doc__ = pysparkfuncs.row_number.__doc__
def aggregate(
col: "ColumnOrName",
initialValue: "ColumnOrName",
merge: Callable[[Column, Column], Column],
finish: Optional[Callable[[Column], Column]] = None,
) -> Column:
if finish is not None:
return _invoke_higher_order_function("aggregate", [col, initialValue], [merge, finish])
else:
return _invoke_higher_order_function("aggregate", [col, initialValue], [merge])
aggregate.__doc__ = pysparkfuncs.aggregate.__doc__
def reduce(
col: "ColumnOrName",
initialValue: "ColumnOrName",
merge: Callable[[Column, Column], Column],
finish: Optional[Callable[[Column], Column]] = None,
) -> Column:
if finish is not None:
return _invoke_higher_order_function("reduce", [col, initialValue], [merge, finish])
else:
return _invoke_higher_order_function("reduce", [col, initialValue], [merge])
reduce.__doc__ = pysparkfuncs.reduce.__doc__
def array(*cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]) -> Column:
if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
cols = cols[0] # type: ignore[assignment]
return _invoke_function_over_columns("array", *cols) # type: ignore[arg-type]
array.__doc__ = pysparkfuncs.array.__doc__
def array_append(col: "ColumnOrName", value: Any) -> Column:
return _invoke_function("array_append", _to_col(col), lit(value))
array_append.__doc__ = pysparkfuncs.array_append.__doc__
def array_contains(col: "ColumnOrName", value: Any) -> Column:
return _invoke_function("array_contains", _to_col(col), lit(value))
array_contains.__doc__ = pysparkfuncs.array_contains.__doc__
def array_distinct(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_distinct", col)
array_distinct.__doc__ = pysparkfuncs.array_distinct.__doc__
def array_except(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_except", col1, col2)
array_except.__doc__ = pysparkfuncs.array_except.__doc__
def array_insert(arr: "ColumnOrName", pos: Union["ColumnOrName", int], value: Any) -> Column:
_pos = lit(pos) if isinstance(pos, int) else _to_col(pos)
return _invoke_function("array_insert", _to_col(arr), _pos, lit(value))
array_insert.__doc__ = pysparkfuncs.array_insert.__doc__
def array_intersect(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_intersect", col1, col2)
array_intersect.__doc__ = pysparkfuncs.array_intersect.__doc__
def array_compact(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_compact", col)
array_compact.__doc__ = pysparkfuncs.array_compact.__doc__
def array_join(
col: "ColumnOrName", delimiter: str, null_replacement: Optional[str] = None
) -> Column:
if null_replacement is None:
return _invoke_function("array_join", _to_col(col), lit(delimiter))
else:
return _invoke_function("array_join", _to_col(col), lit(delimiter), lit(null_replacement))
array_join.__doc__ = pysparkfuncs.array_join.__doc__
def array_max(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_max", col)
array_max.__doc__ = pysparkfuncs.array_max.__doc__
def array_min(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_min", col)
array_min.__doc__ = pysparkfuncs.array_min.__doc__
def array_size(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_size", col)
array_size.__doc__ = pysparkfuncs.array_size.__doc__
def cardinality(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("cardinality", col)
cardinality.__doc__ = pysparkfuncs.cardinality.__doc__
def array_position(col: "ColumnOrName", value: Any) -> Column:
return _invoke_function("array_position", _to_col(col), lit(value))
array_position.__doc__ = pysparkfuncs.array_position.__doc__
def array_prepend(col: "ColumnOrName", value: Any) -> Column:
return _invoke_function("array_prepend", _to_col(col), lit(value))
array_prepend.__doc__ = pysparkfuncs.array_prepend.__doc__
def array_remove(col: "ColumnOrName", element: Any) -> Column:
return _invoke_function("array_remove", _to_col(col), lit(element))
array_remove.__doc__ = pysparkfuncs.array_remove.__doc__
def array_repeat(col: "ColumnOrName", count: Union["ColumnOrName", int]) -> Column:
_count = lit(count) if isinstance(count, int) else _to_col(count)
return _invoke_function("array_repeat", _to_col(col), _count)
array_repeat.__doc__ = pysparkfuncs.array_repeat.__doc__
def array_sort(
col: "ColumnOrName", comparator: Optional[Callable[[Column, Column], Column]] = None
) -> Column:
if comparator is None:
return _invoke_function_over_columns("array_sort", col)
else:
return _invoke_higher_order_function("array_sort", [col], [comparator])
array_sort.__doc__ = pysparkfuncs.array_sort.__doc__
def array_union(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_union", col1, col2)
array_union.__doc__ = pysparkfuncs.array_union.__doc__
def arrays_overlap(a1: "ColumnOrName", a2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("arrays_overlap", a1, a2)
arrays_overlap.__doc__ = pysparkfuncs.arrays_overlap.__doc__
def arrays_zip(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("arrays_zip", *cols)
arrays_zip.__doc__ = pysparkfuncs.arrays_zip.__doc__
def concat(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("concat", *cols)
concat.__doc__ = pysparkfuncs.concat.__doc__
def create_map(
*cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
cols = cols[0] # type: ignore[assignment]
return _invoke_function_over_columns("map", *cols) # type: ignore[arg-type]
create_map.__doc__ = pysparkfuncs.create_map.__doc__
def element_at(col: "ColumnOrName", extraction: Any) -> Column:
return _invoke_function("element_at", _to_col(col), lit(extraction))
element_at.__doc__ = pysparkfuncs.element_at.__doc__
def try_element_at(col: "ColumnOrName", extraction: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_element_at", col, extraction)
try_element_at.__doc__ = pysparkfuncs.try_element_at.__doc__
def exists(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column:
return _invoke_higher_order_function("exists", [col], [f])
exists.__doc__ = pysparkfuncs.exists.__doc__
def explode(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("explode", col)
explode.__doc__ = pysparkfuncs.explode.__doc__
def explode_outer(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("explode_outer", col)
explode_outer.__doc__ = pysparkfuncs.explode_outer.__doc__
def filter(
col: "ColumnOrName",
f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]],
) -> Column:
return _invoke_higher_order_function("filter", [col], [f])
filter.__doc__ = pysparkfuncs.filter.__doc__
def flatten(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("flatten", col)
flatten.__doc__ = pysparkfuncs.flatten.__doc__
def forall(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column:
return _invoke_higher_order_function("forall", [col], [f])
forall.__doc__ = pysparkfuncs.forall.__doc__
# TODO: support options
def from_csv(
col: "ColumnOrName",
schema: Union[Column, str],
options: Optional[Dict[str, str]] = None,
) -> Column:
if isinstance(schema, Column):
_schema = schema
elif isinstance(schema, str):
_schema = lit(schema)
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "schema", "arg_type": type(schema).__name__},
)
if options is None:
return _invoke_function("from_csv", _to_col(col), _schema)
else:
return _invoke_function("from_csv", _to_col(col), _schema, _options_to_col(options))
from_csv.__doc__ = pysparkfuncs.from_csv.__doc__
def from_json(
col: "ColumnOrName",
schema: Union[ArrayType, StructType, Column, str],
options: Optional[Dict[str, str]] = None,
) -> Column:
if isinstance(schema, Column):
_schema = schema
elif isinstance(schema, DataType):
_schema = lit(schema.json())
elif isinstance(schema, str):
_schema = lit(schema)
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_DATATYPE_OR_STR",
message_parameters={"arg_name": "schema", "arg_type": type(schema).__name__},
)
if options is None:
return _invoke_function("from_json", _to_col(col), _schema)
else:
return _invoke_function("from_json", _to_col(col), _schema, _options_to_col(options))
from_json.__doc__ = pysparkfuncs.from_json.__doc__
def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column:
index = lit(index) if isinstance(index, int) else index
return _invoke_function_over_columns("get", col, index)
get.__doc__ = pysparkfuncs.get.__doc__
def get_json_object(col: "ColumnOrName", path: str) -> Column:
return _invoke_function("get_json_object", _to_col(col), lit(path))
get_json_object.__doc__ = pysparkfuncs.get_json_object.__doc__
def json_array_length(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("json_array_length", col)
json_array_length.__doc__ = pysparkfuncs.json_array_length.__doc__
def json_object_keys(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("json_object_keys", col)
json_object_keys.__doc__ = pysparkfuncs.json_object_keys.__doc__
def inline(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("inline", col)
inline.__doc__ = pysparkfuncs.inline.__doc__
def inline_outer(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("inline_outer", col)
inline_outer.__doc__ = pysparkfuncs.inline_outer.__doc__
def json_tuple(col: "ColumnOrName", *fields: str) -> Column:
return _invoke_function("json_tuple", _to_col(col), *[lit(field) for field in fields])
json_tuple.__doc__ = pysparkfuncs.json_tuple.__doc__
def map_concat(
*cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
cols = cols[0] # type: ignore[assignment]
return _invoke_function_over_columns("map_concat", *cols) # type: ignore[arg-type]
map_concat.__doc__ = pysparkfuncs.map_concat.__doc__
def map_contains_key(col: "ColumnOrName", value: Any) -> Column:
return array_contains(map_keys(col), lit(value))
map_contains_key.__doc__ = pysparkfuncs.map_contains_key.__doc__
def map_entries(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("map_entries", col)
map_entries.__doc__ = pysparkfuncs.map_entries.__doc__
def map_filter(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
return _invoke_higher_order_function("map_filter", [col], [f])
map_filter.__doc__ = pysparkfuncs.map_filter.__doc__
def map_from_arrays(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("map_from_arrays", col1, col2)
map_from_arrays.__doc__ = pysparkfuncs.map_from_arrays.__doc__
def map_from_entries(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("map_from_entries", col)
map_from_entries.__doc__ = pysparkfuncs.map_from_entries.__doc__
def map_keys(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("map_keys", col)
map_keys.__doc__ = pysparkfuncs.map_keys.__doc__
def map_values(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("map_values", col)
map_values.__doc__ = pysparkfuncs.map_values.__doc__
def map_zip_with(
col1: "ColumnOrName",
col2: "ColumnOrName",
f: Callable[[Column, Column, Column], Column],
) -> Column:
return _invoke_higher_order_function("map_zip_with", [col1, col2], [f])
map_zip_with.__doc__ = pysparkfuncs.map_zip_with.__doc__
def str_to_map(
text: "ColumnOrName",
pairDelim: Optional["ColumnOrName"] = None,
keyValueDelim: Optional["ColumnOrName"] = None,
) -> Column:
_pairDelim = lit(",") if pairDelim is None else _to_col(pairDelim)
_keyValueDelim = lit(":") if keyValueDelim is None else _to_col(keyValueDelim)
return _invoke_function("str_to_map", _to_col(text), _pairDelim, _keyValueDelim)
str_to_map.__doc__ = pysparkfuncs.str_to_map.__doc__
def posexplode(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("posexplode", col)
posexplode.__doc__ = pysparkfuncs.posexplode.__doc__
def posexplode_outer(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("posexplode_outer", col)
posexplode_outer.__doc__ = pysparkfuncs.posexplode_outer.__doc__
def reverse(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("reverse", col)
reverse.__doc__ = pysparkfuncs.reverse.__doc__
def sequence(
start: "ColumnOrName", stop: "ColumnOrName", step: Optional["ColumnOrName"] = None
) -> Column:
if step is None:
return _invoke_function_over_columns("sequence", start, stop)
else:
return _invoke_function_over_columns("sequence", start, stop, step)
sequence.__doc__ = pysparkfuncs.sequence.__doc__
def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
if isinstance(csv, Column):
_csv = csv
elif isinstance(csv, str):
_csv = lit(csv)
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "csv", "arg_type": type(csv).__name__},
)
if options is None:
return _invoke_function("schema_of_csv", _csv)
else:
return _invoke_function("schema_of_csv", _csv, _options_to_col(options))
schema_of_csv.__doc__ = pysparkfuncs.schema_of_csv.__doc__
def schema_of_json(json: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
if isinstance(json, Column):
_json = json
elif isinstance(json, str):
_json = lit(json)
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "json", "arg_type": type(json).__name__},
)
if options is None:
return _invoke_function("schema_of_json", _json)
else:
return _invoke_function("schema_of_json", _json, _options_to_col(options))
schema_of_json.__doc__ = pysparkfuncs.schema_of_json.__doc__
def shuffle(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("shuffle", col)
shuffle.__doc__ = pysparkfuncs.shuffle.__doc__
def size(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("size", col)
size.__doc__ = pysparkfuncs.size.__doc__
def slice(
col: "ColumnOrName", start: Union["ColumnOrName", int], length: Union["ColumnOrName", int]
) -> Column:
if isinstance(start, (Column, str)):
_start = start
elif isinstance(start, int):
_start = lit(start)
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_INT_OR_STR",
message_parameters={"arg_name": "start", "arg_type": type(start).__name__},
)
if isinstance(length, (Column, str)):
_length = length
elif isinstance(length, int):
_length = lit(length)
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_INT_OR_STR",
message_parameters={"arg_name": "length", "arg_type": type(length).__name__},
)
return _invoke_function_over_columns("slice", col, _start, _length)
slice.__doc__ = pysparkfuncs.slice.__doc__
def sort_array(col: "ColumnOrName", asc: bool = True) -> Column:
return _invoke_function("sort_array", _to_col(col), lit(asc))
sort_array.__doc__ = pysparkfuncs.sort_array.__doc__
def struct(
*cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]
) -> Column:
if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
cols = cols[0] # type: ignore[assignment]
return _invoke_function_over_columns("struct", *cols) # type: ignore[arg-type]
struct.__doc__ = pysparkfuncs.struct.__doc__
def named_struct(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("named_struct", *cols)
named_struct.__doc__ = pysparkfuncs.named_struct.__doc__
def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
if options is None:
return _invoke_function("to_csv", _to_col(col))
else:
return _invoke_function("to_csv", _to_col(col), _options_to_col(options))
to_csv.__doc__ = pysparkfuncs.to_csv.__doc__
def to_json(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
if options is None:
return _invoke_function("to_json", _to_col(col))
else:
return _invoke_function("to_json", _to_col(col), _options_to_col(options))
to_json.__doc__ = pysparkfuncs.to_json.__doc__
def transform(
col: "ColumnOrName",
f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]],
) -> Column:
return _invoke_higher_order_function("transform", [col], [f])
transform.__doc__ = pysparkfuncs.transform.__doc__
def transform_keys(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
return _invoke_higher_order_function("transform_keys", [col], [f])
transform_keys.__doc__ = pysparkfuncs.transform_keys.__doc__
def transform_values(col: "ColumnOrName", f: Callable[[Column, Column], Column]) -> Column:
return _invoke_higher_order_function("transform_values", [col], [f])
transform_values.__doc__ = pysparkfuncs.transform_values.__doc__
def zip_with(
left: "ColumnOrName",
right: "ColumnOrName",
f: Callable[[Column, Column], Column],
) -> Column:
return _invoke_higher_order_function("zip_with", [left, right], [f])
zip_with.__doc__ = pysparkfuncs.zip_with.__doc__
# String/Binary functions
def upper(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("upper", col)
upper.__doc__ = pysparkfuncs.upper.__doc__
def lower(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("lower", col)
lower.__doc__ = pysparkfuncs.lower.__doc__
def ascii(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("ascii", col)
ascii.__doc__ = pysparkfuncs.ascii.__doc__
def base64(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("base64", col)
base64.__doc__ = pysparkfuncs.base64.__doc__
def unbase64(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("unbase64", col)
unbase64.__doc__ = pysparkfuncs.unbase64.__doc__
def ltrim(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("ltrim", col)
ltrim.__doc__ = pysparkfuncs.ltrim.__doc__
def rtrim(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("rtrim", col)
rtrim.__doc__ = pysparkfuncs.rtrim.__doc__
def trim(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("trim", col)
trim.__doc__ = pysparkfuncs.trim.__doc__
def concat_ws(sep: str, *cols: "ColumnOrName") -> Column:
return _invoke_function("concat_ws", lit(sep), *[_to_col(c) for c in cols])
concat_ws.__doc__ = pysparkfuncs.concat_ws.__doc__
def decode(col: "ColumnOrName", charset: str) -> Column:
return _invoke_function("decode", _to_col(col), lit(charset))
decode.__doc__ = pysparkfuncs.decode.__doc__
def encode(col: "ColumnOrName", charset: str) -> Column:
return _invoke_function("encode", _to_col(col), lit(charset))
encode.__doc__ = pysparkfuncs.encode.__doc__
def format_number(col: "ColumnOrName", d: int) -> Column:
return _invoke_function("format_number", _to_col(col), lit(d))
format_number.__doc__ = pysparkfuncs.format_number.__doc__
def format_string(format: str, *cols: "ColumnOrName") -> Column:
return _invoke_function("format_string", lit(format), *[_to_col(c) for c in cols])
format_string.__doc__ = pysparkfuncs.format_string.__doc__
def instr(str: "ColumnOrName", substr: str) -> Column:
return _invoke_function("instr", _to_col(str), lit(substr))
instr.__doc__ = pysparkfuncs.instr.__doc__
def overlay(
src: "ColumnOrName",
replace: "ColumnOrName",
pos: Union["ColumnOrName", int],
len: Union["ColumnOrName", int] = -1,
) -> Column:
if not isinstance(pos, (int, str, Column)):
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_INT_OR_STR",
message_parameters={"arg_name": "pos", "arg_type": type(pos).__name__},
)
if len is not None and not isinstance(len, (int, str, Column)):
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_INT_OR_STR",
message_parameters={"arg_name": "len", "arg_type": type(len).__name__},
)
if isinstance(pos, int):
pos = lit(pos)
if isinstance(len, int):
len = lit(len)
return _invoke_function_over_columns("overlay", src, replace, pos, len)
overlay.__doc__ = pysparkfuncs.overlay.__doc__
def sentences(
string: "ColumnOrName",
language: Optional["ColumnOrName"] = None,
country: Optional["ColumnOrName"] = None,
) -> Column:
_language = lit("") if language is None else _to_col(language)
_country = lit("") if country is None else _to_col(country)
return _invoke_function("sentences", _to_col(string), _language, _country)
sentences.__doc__ = pysparkfuncs.sentences.__doc__
def substring(str: "ColumnOrName", pos: int, len: int) -> Column:
return _invoke_function("substring", _to_col(str), lit(pos), lit(len))
substring.__doc__ = pysparkfuncs.substring.__doc__
def substring_index(str: "ColumnOrName", delim: str, count: int) -> Column:
return _invoke_function("substring_index", _to_col(str), lit(delim), lit(count))
substring_index.__doc__ = pysparkfuncs.substring_index.__doc__
def levenshtein(
left: "ColumnOrName", right: "ColumnOrName", threshold: Optional[int] = None
) -> Column:
if threshold is None:
return _invoke_function_over_columns("levenshtein", left, right)
else:
return _invoke_function("levenshtein", _to_col(left), _to_col(right), lit(threshold))
levenshtein.__doc__ = pysparkfuncs.levenshtein.__doc__
def locate(substr: str, str: "ColumnOrName", pos: int = 1) -> Column:
return _invoke_function("locate", lit(substr), _to_col(str), lit(pos))
locate.__doc__ = pysparkfuncs.locate.__doc__
def lpad(col: "ColumnOrName", len: int, pad: str) -> Column:
return _invoke_function("lpad", _to_col(col), lit(len), lit(pad))
lpad.__doc__ = pysparkfuncs.lpad.__doc__
def rpad(col: "ColumnOrName", len: int, pad: str) -> Column:
return _invoke_function("rpad", _to_col(col), lit(len), lit(pad))
rpad.__doc__ = pysparkfuncs.rpad.__doc__
def repeat(col: "ColumnOrName", n: int) -> Column:
return _invoke_function("repeat", _to_col(col), lit(n))
repeat.__doc__ = pysparkfuncs.repeat.__doc__
def split(str: "ColumnOrName", pattern: str, limit: int = -1) -> Column:
return _invoke_function("split", _to_col(str), lit(pattern), lit(limit))
split.__doc__ = pysparkfuncs.split.__doc__
def rlike(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
return _invoke_function_over_columns("rlike", str, regexp)
rlike.__doc__ = pysparkfuncs.rlike.__doc__
def regexp(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regexp", str, regexp)
regexp.__doc__ = pysparkfuncs.regexp.__doc__
def regexp_like(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regexp_like", str, regexp)
regexp_like.__doc__ = pysparkfuncs.regexp_like.__doc__
def regexp_count(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regexp_count", str, regexp)
regexp_count.__doc__ = pysparkfuncs.regexp_count.__doc__
def regexp_extract(str: "ColumnOrName", pattern: str, idx: int) -> Column:
return _invoke_function("regexp_extract", _to_col(str), lit(pattern), lit(idx))
regexp_extract.__doc__ = pysparkfuncs.regexp_extract.__doc__
def regexp_extract_all(
str: "ColumnOrName", regexp: "ColumnOrName", idx: Optional[Union[int, Column]] = None
) -> Column:
if idx is None:
return _invoke_function_over_columns("regexp_extract_all", str, regexp)
else:
if isinstance(idx, int):
idx = lit(idx)
return _invoke_function_over_columns("regexp_extract_all", str, regexp, idx)
regexp_extract_all.__doc__ = pysparkfuncs.regexp_extract_all.__doc__
def regexp_replace(
string: "ColumnOrName", pattern: Union[str, Column], replacement: Union[str, Column]
) -> Column:
if isinstance(pattern, str):
pattern = lit(pattern)
if isinstance(replacement, str):
replacement = lit(replacement)
return _invoke_function("regexp_replace", _to_col(string), pattern, replacement)
regexp_replace.__doc__ = pysparkfuncs.regexp_replace.__doc__
def regexp_substr(str: "ColumnOrName", regexp: "ColumnOrName") -> Column:
return _invoke_function_over_columns("regexp_substr", str, regexp)
regexp_substr.__doc__ = pysparkfuncs.regexp_substr.__doc__
def regexp_instr(
str: "ColumnOrName", regexp: "ColumnOrName", idx: Optional[Union[int, Column]] = None
) -> Column:
if idx is None:
return _invoke_function_over_columns("regexp_instr", str, regexp)
else:
if isinstance(idx, int):
idx = lit(idx)
return _invoke_function_over_columns("regexp_instr", str, regexp, idx)
regexp_instr.__doc__ = pysparkfuncs.regexp_instr.__doc__
def initcap(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("initcap", col)
initcap.__doc__ = pysparkfuncs.initcap.__doc__
def soundex(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("soundex", col)
soundex.__doc__ = pysparkfuncs.soundex.__doc__
def length(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("length", col)
length.__doc__ = pysparkfuncs.length.__doc__
def octet_length(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("octet_length", col)
octet_length.__doc__ = pysparkfuncs.octet_length.__doc__
def bit_length(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bit_length", col)
bit_length.__doc__ = pysparkfuncs.bit_length.__doc__
def translate(srcCol: "ColumnOrName", matching: str, replace: str) -> Column:
return _invoke_function("translate", _to_col(srcCol), lit(matching), lit(replace))
translate.__doc__ = pysparkfuncs.translate.__doc__
def to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column:
if format is not None:
return _invoke_function_over_columns("to_binary", col, format)
else:
return _invoke_function_over_columns("to_binary", col)
to_binary.__doc__ = pysparkfuncs.to_binary.__doc__
def to_char(col: "ColumnOrName", format: "ColumnOrName") -> Column:
return _invoke_function_over_columns("to_char", col, format)
to_char.__doc__ = pysparkfuncs.to_char.__doc__
def to_varchar(col: "ColumnOrName", format: "ColumnOrName") -> Column:
return _invoke_function_over_columns("to_varchar", col, format)
to_varchar.__doc__ = pysparkfuncs.to_varchar.__doc__
def to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column:
return _invoke_function_over_columns("to_number", col, format)
to_number.__doc__ = pysparkfuncs.to_number.__doc__
def replace(
src: "ColumnOrName", search: "ColumnOrName", replace: Optional["ColumnOrName"] = None
) -> Column:
if replace is not None:
return _invoke_function_over_columns("replace", src, search, replace)
else:
return _invoke_function_over_columns("replace", src, search)
replace.__doc__ = pysparkfuncs.replace.__doc__
def split_part(src: "ColumnOrName", delimiter: "ColumnOrName", partNum: "ColumnOrName") -> Column:
return _invoke_function_over_columns("split_part", src, delimiter, partNum)
split_part.__doc__ = pysparkfuncs.split_part.__doc__
def substr(
str: "ColumnOrName", pos: "ColumnOrName", len: Optional["ColumnOrName"] = None
) -> Column:
if len is not None:
return _invoke_function_over_columns("substr", str, pos, len)
else:
return _invoke_function_over_columns("substr", str, pos)
substr.__doc__ = pysparkfuncs.substr.__doc__
def parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
if key is not None:
return _invoke_function_over_columns("parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("parse_url", url, partToExtract)
parse_url.__doc__ = pysparkfuncs.parse_url.__doc__
def printf(format: "ColumnOrName", *cols: "ColumnOrName") -> Column:
return _invoke_function("printf", lit(format), *[_to_col(c) for c in cols])
printf.__doc__ = pysparkfuncs.printf.__doc__
def url_decode(str: "ColumnOrName") -> Column:
return _invoke_function_over_columns("url_decode", str)
url_decode.__doc__ = pysparkfuncs.url_decode.__doc__
def url_encode(str: "ColumnOrName") -> Column:
return _invoke_function_over_columns("url_encode", str)
url_encode.__doc__ = pysparkfuncs.url_encode.__doc__
def position(
substr: "ColumnOrName", str: "ColumnOrName", start: Optional["ColumnOrName"] = None
) -> Column:
if start is not None:
return _invoke_function_over_columns("position", substr, str, start)
else:
return _invoke_function_over_columns("position", substr, str)
position.__doc__ = pysparkfuncs.position.__doc__
def endswith(str: "ColumnOrName", suffix: "ColumnOrName") -> Column:
return _invoke_function_over_columns("endswith", str, suffix)
endswith.__doc__ = pysparkfuncs.endswith.__doc__
def startswith(str: "ColumnOrName", prefix: "ColumnOrName") -> Column:
return _invoke_function_over_columns("startswith", str, prefix)
startswith.__doc__ = pysparkfuncs.startswith.__doc__
def char(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("char", col)
char.__doc__ = pysparkfuncs.char.__doc__
def try_to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column:
if format is not None:
return _invoke_function_over_columns("try_to_binary", col, format)
else:
return _invoke_function_over_columns("try_to_binary", col)
try_to_binary.__doc__ = pysparkfuncs.try_to_binary.__doc__
def try_to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column:
return _invoke_function_over_columns("try_to_number", col, format)
try_to_number.__doc__ = pysparkfuncs.try_to_number.__doc__
def btrim(str: "ColumnOrName", trim: Optional["ColumnOrName"] = None) -> Column:
if trim is not None:
return _invoke_function_over_columns("btrim", str, trim)
else:
return _invoke_function_over_columns("btrim", str)
btrim.__doc__ = pysparkfuncs.btrim.__doc__
def char_length(str: "ColumnOrName") -> Column:
return _invoke_function_over_columns("char_length", str)
char_length.__doc__ = pysparkfuncs.char_length.__doc__
def character_length(str: "ColumnOrName") -> Column:
return _invoke_function_over_columns("character_length", str)
character_length.__doc__ = pysparkfuncs.character_length.__doc__
def chr(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("chr", col)
chr.__doc__ = pysparkfuncs.chr.__doc__
def contains(left: "ColumnOrName", right: "ColumnOrName") -> Column:
return _invoke_function_over_columns("contains", left, right)
contains.__doc__ = pysparkfuncs.contains.__doc__
def elt(*inputs: "ColumnOrName") -> Column:
return _invoke_function("elt", *[_to_col(input) for input in inputs])
elt.__doc__ = pysparkfuncs.elt.__doc__
def find_in_set(str: "ColumnOrName", str_array: "ColumnOrName") -> Column:
return _invoke_function_over_columns("find_in_set", str, str_array)
find_in_set.__doc__ = pysparkfuncs.find_in_set.__doc__
def like(
str: "ColumnOrName", pattern: "ColumnOrName", escapeChar: Optional["Column"] = None
) -> Column:
if escapeChar is not None:
return _invoke_function_over_columns("like", str, pattern, escapeChar)
else:
return _invoke_function_over_columns("like", str, pattern)
like.__doc__ = pysparkfuncs.like.__doc__
def ilike(
str: "ColumnOrName", pattern: "ColumnOrName", escapeChar: Optional["Column"] = None
) -> Column:
if escapeChar is not None:
return _invoke_function_over_columns("ilike", str, pattern, escapeChar)
else:
return _invoke_function_over_columns("ilike", str, pattern)
ilike.__doc__ = pysparkfuncs.ilike.__doc__
def lcase(str: "ColumnOrName") -> Column:
return _invoke_function_over_columns("lcase", str)
lcase.__doc__ = pysparkfuncs.lcase.__doc__
def ucase(str: "ColumnOrName") -> Column:
return _invoke_function_over_columns("ucase", str)
ucase.__doc__ = pysparkfuncs.ucase.__doc__
def left(str: "ColumnOrName", len: "ColumnOrName") -> Column:
return _invoke_function_over_columns("left", str, len)
left.__doc__ = pysparkfuncs.left.__doc__
def right(str: "ColumnOrName", len: "ColumnOrName") -> Column:
return _invoke_function_over_columns("right", str, len)
right.__doc__ = pysparkfuncs.right.__doc__
def mask(
col: "ColumnOrName",
upperChar: Optional["ColumnOrName"] = None,
lowerChar: Optional["ColumnOrName"] = None,
digitChar: Optional["ColumnOrName"] = None,
otherChar: Optional["ColumnOrName"] = None,
) -> Column:
_upperChar = lit("X") if upperChar is None else upperChar
_lowerChar = lit("x") if lowerChar is None else lowerChar
_digitChar = lit("n") if digitChar is None else digitChar
_otherChar = lit(None) if otherChar is None else otherChar
return _invoke_function_over_columns(
"mask", col, _upperChar, _lowerChar, _digitChar, _otherChar
)
mask.__doc__ = pysparkfuncs.mask.__doc__
# Date/Timestamp functions
# TODO(SPARK-41455): Resolve dtypes inconsistencies for:
# to_timestamp, from_utc_timestamp, to_utc_timestamp,
# timestamp_seconds, current_timestamp, date_trunc
def curdate() -> Column:
return _invoke_function("curdate")
curdate.__doc__ = pysparkfuncs.curdate.__doc__
def current_date() -> Column:
return _invoke_function("current_date")
current_date.__doc__ = pysparkfuncs.current_date.__doc__
def current_timestamp() -> Column:
return _invoke_function("current_timestamp")
current_timestamp.__doc__ = pysparkfuncs.current_timestamp.__doc__
def now() -> Column:
return _invoke_function("current_timestamp")
now.__doc__ = pysparkfuncs.now.__doc__
def current_timezone() -> Column:
return _invoke_function("current_timezone")
current_timezone.__doc__ = pysparkfuncs.current_timezone.__doc__
def localtimestamp() -> Column:
return _invoke_function("localtimestamp")
localtimestamp.__doc__ = pysparkfuncs.localtimestamp.__doc__
def date_format(date: "ColumnOrName", format: str) -> Column:
return _invoke_function("date_format", _to_col(date), lit(format))
date_format.__doc__ = pysparkfuncs.date_format.__doc__
def year(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("year", col)
year.__doc__ = pysparkfuncs.year.__doc__
def quarter(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("quarter", col)
quarter.__doc__ = pysparkfuncs.quarter.__doc__
def month(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("month", col)
month.__doc__ = pysparkfuncs.month.__doc__
def dayofweek(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("dayofweek", col)
dayofweek.__doc__ = pysparkfuncs.dayofweek.__doc__
def dayofmonth(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("dayofmonth", col)
dayofmonth.__doc__ = pysparkfuncs.dayofmonth.__doc__
def day(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("day", col)
day.__doc__ = pysparkfuncs.day.__doc__
def dayofyear(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("dayofyear", col)
dayofyear.__doc__ = pysparkfuncs.dayofyear.__doc__
def hour(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("hour", col)
hour.__doc__ = pysparkfuncs.hour.__doc__
def minute(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("minute", col)
minute.__doc__ = pysparkfuncs.minute.__doc__
def second(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("second", col)
second.__doc__ = pysparkfuncs.second.__doc__
def weekofyear(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("weekofyear", col)
weekofyear.__doc__ = pysparkfuncs.weekofyear.__doc__
def weekday(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("weekday", col)
weekday.__doc__ = pysparkfuncs.weekday.__doc__
def extract(field: "ColumnOrName", source: "ColumnOrName") -> Column:
return _invoke_function_over_columns("extract", field, source)
extract.__doc__ = pysparkfuncs.extract.__doc__
def date_part(field: "ColumnOrName", source: "ColumnOrName") -> Column:
return _invoke_function_over_columns("date_part", field, source)
extract.__doc__ = pysparkfuncs.extract.__doc__
def datepart(field: "ColumnOrName", source: "ColumnOrName") -> Column:
return _invoke_function_over_columns("datepart", field, source)
extract.__doc__ = pysparkfuncs.extract.__doc__
def make_date(year: "ColumnOrName", month: "ColumnOrName", day: "ColumnOrName") -> Column:
return _invoke_function_over_columns("make_date", year, month, day)
make_date.__doc__ = pysparkfuncs.make_date.__doc__
def date_add(start: "ColumnOrName", days: Union["ColumnOrName", int]) -> Column:
days = lit(days) if isinstance(days, int) else days
return _invoke_function_over_columns("date_add", start, days)
date_add.__doc__ = pysparkfuncs.date_add.__doc__
def dateadd(start: "ColumnOrName", days: Union["ColumnOrName", int]) -> Column:
days = lit(days) if isinstance(days, int) else days
return _invoke_function_over_columns("dateadd", start, days)
dateadd.__doc__ = pysparkfuncs.dateadd.__doc__
def date_sub(start: "ColumnOrName", days: Union["ColumnOrName", int]) -> Column:
days = lit(days) if isinstance(days, int) else days
return _invoke_function_over_columns("date_sub", start, days)
date_sub.__doc__ = pysparkfuncs.date_sub.__doc__
def datediff(end: "ColumnOrName", start: "ColumnOrName") -> Column:
return _invoke_function_over_columns("datediff", end, start)
datediff.__doc__ = pysparkfuncs.datediff.__doc__
def date_diff(end: "ColumnOrName", start: "ColumnOrName") -> Column:
return _invoke_function_over_columns("date_diff", end, start)
date_diff.__doc__ = pysparkfuncs.date_diff.__doc__
def date_from_unix_date(days: "ColumnOrName") -> Column:
return _invoke_function_over_columns("date_from_unix_date", days)
date_from_unix_date.__doc__ = pysparkfuncs.date_from_unix_date.__doc__
def add_months(start: "ColumnOrName", months: Union["ColumnOrName", int]) -> Column:
months = lit(months) if isinstance(months, int) else months
return _invoke_function_over_columns("add_months", start, months)
add_months.__doc__ = pysparkfuncs.add_months.__doc__
def months_between(date1: "ColumnOrName", date2: "ColumnOrName", roundOff: bool = True) -> Column:
return _invoke_function("months_between", _to_col(date1), _to_col(date2), lit(roundOff))
months_between.__doc__ = pysparkfuncs.months_between.__doc__
def to_date(col: "ColumnOrName", format: Optional[str] = None) -> Column:
if format is None:
return _invoke_function_over_columns("to_date", col)
else:
return _invoke_function("to_date", _to_col(col), lit(format))
to_date.__doc__ = pysparkfuncs.to_date.__doc__
def unix_date(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("unix_date", col)
unix_date.__doc__ = pysparkfuncs.unix_date.__doc__
def unix_micros(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("unix_micros", col)
unix_micros.__doc__ = pysparkfuncs.unix_micros.__doc__
def unix_millis(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("unix_millis", col)
unix_millis.__doc__ = pysparkfuncs.unix_millis.__doc__
def unix_seconds(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("unix_seconds", col)
unix_seconds.__doc__ = pysparkfuncs.unix_seconds.__doc__
@overload
def to_timestamp(col: "ColumnOrName") -> Column:
...
@overload
def to_timestamp(col: "ColumnOrName", format: str) -> Column:
...
def to_timestamp(col: "ColumnOrName", format: Optional[str] = None) -> Column:
if format is None:
return _invoke_function_over_columns("to_timestamp", col)
else:
return _invoke_function("to_timestamp", _to_col(col), lit(format))
to_timestamp.__doc__ = pysparkfuncs.to_timestamp.__doc__
def try_to_timestamp(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column:
if format is not None:
return _invoke_function_over_columns("try_to_timestamp", col, format)
else:
return _invoke_function_over_columns("try_to_timestamp", col)
try_to_timestamp.__doc__ = pysparkfuncs.try_to_timestamp.__doc__
def xpath(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath", xml, path)
xpath.__doc__ = pysparkfuncs.xpath.__doc__
def xpath_boolean(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_boolean", xml, path)
xpath_boolean.__doc__ = pysparkfuncs.xpath_boolean.__doc__
def xpath_double(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_double", xml, path)
xpath_double.__doc__ = pysparkfuncs.xpath_double.__doc__
def xpath_number(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_number", xml, path)
xpath_number.__doc__ = pysparkfuncs.xpath_number.__doc__
def xpath_float(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_float", xml, path)
xpath_float.__doc__ = pysparkfuncs.xpath_float.__doc__
def xpath_int(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_int", xml, path)
xpath_int.__doc__ = pysparkfuncs.xpath_int.__doc__
def xpath_long(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_long", xml, path)
xpath_long.__doc__ = pysparkfuncs.xpath_long.__doc__
def xpath_short(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_short", xml, path)
xpath_short.__doc__ = pysparkfuncs.xpath_short.__doc__
def xpath_string(xml: "ColumnOrName", path: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xpath_string", xml, path)
xpath_string.__doc__ = pysparkfuncs.xpath_string.__doc__
def trunc(date: "ColumnOrName", format: str) -> Column:
return _invoke_function("trunc", _to_col(date), lit(format))
trunc.__doc__ = pysparkfuncs.trunc.__doc__
def date_trunc(format: str, timestamp: "ColumnOrName") -> Column:
return _invoke_function("date_trunc", lit(format), _to_col(timestamp))
date_trunc.__doc__ = pysparkfuncs.date_trunc.__doc__
def next_day(date: "ColumnOrName", dayOfWeek: str) -> Column:
return _invoke_function("next_day", _to_col(date), lit(dayOfWeek))
next_day.__doc__ = pysparkfuncs.next_day.__doc__
def last_day(date: "ColumnOrName") -> Column:
return _invoke_function_over_columns("last_day", date)
last_day.__doc__ = pysparkfuncs.last_day.__doc__
def from_unixtime(timestamp: "ColumnOrName", format: str = "yyyy-MM-dd HH:mm:ss") -> Column:
return _invoke_function("from_unixtime", _to_col(timestamp), lit(format))
from_unixtime.__doc__ = pysparkfuncs.from_unixtime.__doc__
@overload
def unix_timestamp(timestamp: "ColumnOrName", format: str = ...) -> Column:
...
@overload
def unix_timestamp() -> Column:
...
def unix_timestamp(
timestamp: Optional["ColumnOrName"] = None, format: str = "yyyy-MM-dd HH:mm:ss"
) -> Column:
if timestamp is None:
return _invoke_function("unix_timestamp")
return _invoke_function("unix_timestamp", _to_col(timestamp), lit(format))
unix_timestamp.__doc__ = pysparkfuncs.unix_timestamp.__doc__
def from_utc_timestamp(timestamp: "ColumnOrName", tz: "ColumnOrName") -> Column:
if isinstance(tz, str):
tz = lit(tz)
return _invoke_function_over_columns("from_utc_timestamp", timestamp, tz)
from_utc_timestamp.__doc__ = pysparkfuncs.from_utc_timestamp.__doc__
def to_utc_timestamp(timestamp: "ColumnOrName", tz: "ColumnOrName") -> Column:
if isinstance(tz, str):
tz = lit(tz)
return _invoke_function_over_columns("to_utc_timestamp", timestamp, tz)
to_utc_timestamp.__doc__ = pysparkfuncs.to_utc_timestamp.__doc__
def timestamp_seconds(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("timestamp_seconds", col)
timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__
def timestamp_millis(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("timestamp_millis", col)
timestamp_millis.__doc__ = pysparkfuncs.timestamp_millis.__doc__
def timestamp_micros(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("timestamp_micros", col)
timestamp_micros.__doc__ = pysparkfuncs.timestamp_micros.__doc__
def window(
timeColumn: "ColumnOrName",
windowDuration: str,
slideDuration: Optional[str] = None,
startTime: Optional[str] = None,
) -> Column:
if windowDuration is None or not isinstance(windowDuration, str):
raise PySparkTypeError(
error_class="NOT_STR",
message_parameters={
"arg_name": "windowDuration",
"arg_type": type(windowDuration).__name__,
},
)
if slideDuration is not None and not isinstance(slideDuration, str):
raise PySparkTypeError(
error_class="NOT_STR",
message_parameters={
"arg_name": "slideDuration",
"arg_type": type(slideDuration).__name__,
},
)
if startTime is not None and not isinstance(startTime, str):
raise PySparkTypeError(
error_class="NOT_STR",
message_parameters={"arg_name": "startTime", "arg_type": type(startTime).__name__},
)
time_col = _to_col(timeColumn)
if slideDuration is not None and startTime is not None:
return _invoke_function(
"window", time_col, lit(windowDuration), lit(slideDuration), lit(startTime)
)
elif slideDuration is not None:
return _invoke_function("window", time_col, lit(windowDuration), lit(slideDuration))
elif startTime is not None:
return _invoke_function(
"window", time_col, lit(windowDuration), lit(windowDuration), lit(startTime)
)
else:
return _invoke_function("window", time_col, lit(windowDuration))
window.__doc__ = pysparkfuncs.window.__doc__
def window_time(
windowColumn: "ColumnOrName",
) -> Column:
return _invoke_function("window_time", _to_col(windowColumn))
window_time.__doc__ = pysparkfuncs.window_time.__doc__
def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) -> Column:
if gapDuration is None or not isinstance(gapDuration, (Column, str)):
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "gapDuration", "arg_type": type(gapDuration).__name__},
)
time_col = _to_col(timeColumn)
if isinstance(gapDuration, Column):
return _invoke_function("session_window", time_col, gapDuration)
else:
return _invoke_function("session_window", time_col, lit(gapDuration))
session_window.__doc__ = pysparkfuncs.session_window.__doc__
def to_unix_timestamp(
timestamp: "ColumnOrName",
format: Optional["ColumnOrName"] = None,
) -> Column:
if format is not None:
return _invoke_function_over_columns("to_unix_timestamp", timestamp, format)
else:
return _invoke_function_over_columns("to_unix_timestamp", timestamp)
to_unix_timestamp.__doc__ = pysparkfuncs.to_unix_timestamp.__doc__
def to_timestamp_ltz(
timestamp: "ColumnOrName",
format: Optional["ColumnOrName"] = None,
) -> Column:
if format is not None:
return _invoke_function_over_columns("to_timestamp_ltz", timestamp, format)
else:
return _invoke_function_over_columns("to_timestamp_ltz", timestamp)
to_timestamp_ltz.__doc__ = pysparkfuncs.to_timestamp_ltz.__doc__
def to_timestamp_ntz(
timestamp: "ColumnOrName",
format: Optional["ColumnOrName"] = None,
) -> Column:
if format is not None:
return _invoke_function_over_columns("to_timestamp_ntz", timestamp, format)
else:
return _invoke_function_over_columns("to_timestamp_ntz", timestamp)
to_timestamp_ntz.__doc__ = pysparkfuncs.to_timestamp_ntz.__doc__
# Partition Transformation Functions
def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
if isinstance(numBuckets, int):
_numBuckets = lit(numBuckets)
elif isinstance(numBuckets, Column):
_numBuckets = numBuckets
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_INT",
message_parameters={
"arg_name": "numBuckets",
"arg_type": type(numBuckets).__name__,
},
)
return _invoke_function("bucket", _numBuckets, _to_col(col))
bucket.__doc__ = pysparkfuncs.bucket.__doc__
def years(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("years", col)
years.__doc__ = pysparkfuncs.years.__doc__
def months(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("months", col)
months.__doc__ = pysparkfuncs.months.__doc__
def days(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("days", col)
days.__doc__ = pysparkfuncs.days.__doc__
def hours(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("hours", col)
hours.__doc__ = pysparkfuncs.hours.__doc__
def convert_timezone(
sourceTz: Optional[Column], targetTz: Column, sourceTs: "ColumnOrName"
) -> Column:
if sourceTz is None:
return _invoke_function_over_columns("convert_timezone", targetTz, sourceTs)
else:
return _invoke_function_over_columns("convert_timezone", sourceTz, targetTz, sourceTs)
convert_timezone.__doc__ = pysparkfuncs.convert_timezone.__doc__
def make_dt_interval(
days: Optional["ColumnOrName"] = None,
hours: Optional["ColumnOrName"] = None,
mins: Optional["ColumnOrName"] = None,
secs: Optional["ColumnOrName"] = None,
) -> Column:
_days = lit(0) if days is None else _to_col(days)
_hours = lit(0) if hours is None else _to_col(hours)
_mins = lit(0) if mins is None else _to_col(mins)
_secs = lit(decimal.Decimal(0)) if secs is None else _to_col(secs)
return _invoke_function_over_columns("make_dt_interval", _days, _hours, _mins, _secs)
make_dt_interval.__doc__ = pysparkfuncs.make_dt_interval.__doc__
def make_interval(
years: Optional["ColumnOrName"] = None,
months: Optional["ColumnOrName"] = None,
weeks: Optional["ColumnOrName"] = None,
days: Optional["ColumnOrName"] = None,
hours: Optional["ColumnOrName"] = None,
mins: Optional["ColumnOrName"] = None,
secs: Optional["ColumnOrName"] = None,
) -> Column:
_years = lit(0) if years is None else _to_col(years)
_months = lit(0) if months is None else _to_col(months)
_weeks = lit(0) if weeks is None else _to_col(weeks)
_days = lit(0) if days is None else _to_col(days)
_hours = lit(0) if hours is None else _to_col(hours)
_mins = lit(0) if mins is None else _to_col(mins)
_secs = lit(decimal.Decimal(0)) if secs is None else _to_col(secs)
return _invoke_function_over_columns(
"make_interval", _years, _months, _weeks, _days, _hours, _mins, _secs
)
make_interval.__doc__ = pysparkfuncs.make_interval.__doc__
def make_timestamp(
years: "ColumnOrName",
months: "ColumnOrName",
days: "ColumnOrName",
hours: "ColumnOrName",
mins: "ColumnOrName",
secs: "ColumnOrName",
timezone: Optional["ColumnOrName"] = None,
) -> Column:
if timezone is not None:
return _invoke_function_over_columns(
"make_timestamp", years, months, days, hours, mins, secs, timezone
)
else:
return _invoke_function_over_columns(
"make_timestamp", years, months, days, hours, mins, secs
)
make_timestamp.__doc__ = pysparkfuncs.make_timestamp.__doc__
def make_timestamp_ltz(
years: "ColumnOrName",
months: "ColumnOrName",
days: "ColumnOrName",
hours: "ColumnOrName",
mins: "ColumnOrName",
secs: "ColumnOrName",
timezone: Optional["ColumnOrName"] = None,
) -> Column:
if timezone is not None:
return _invoke_function_over_columns(
"make_timestamp_ltz", years, months, days, hours, mins, secs, timezone
)
else:
return _invoke_function_over_columns(
"make_timestamp_ltz", years, months, days, hours, mins, secs
)
make_timestamp_ltz.__doc__ = pysparkfuncs.make_timestamp_ltz.__doc__
def make_timestamp_ntz(
years: "ColumnOrName",
months: "ColumnOrName",
days: "ColumnOrName",
hours: "ColumnOrName",
mins: "ColumnOrName",
secs: "ColumnOrName",
) -> Column:
return _invoke_function_over_columns(
"make_timestamp_ntz", years, months, days, hours, mins, secs
)
make_timestamp_ntz.__doc__ = pysparkfuncs.make_timestamp_ntz.__doc__
def make_ym_interval(
years: Optional["ColumnOrName"] = None,
months: Optional["ColumnOrName"] = None,
) -> Column:
_years = lit(0) if years is None else _to_col(years)
_months = lit(0) if months is None else _to_col(months)
return _invoke_function_over_columns("make_ym_interval", _years, _months)
make_ym_interval.__doc__ = pysparkfuncs.make_ym_interval.__doc__
# Misc Functions
def current_catalog() -> Column:
return _invoke_function("current_catalog")
current_catalog.__doc__ = pysparkfuncs.current_catalog.__doc__
def current_database() -> Column:
return _invoke_function("current_database")
current_database.__doc__ = pysparkfuncs.current_database.__doc__
def current_schema() -> Column:
return _invoke_function("current_schema")
current_schema.__doc__ = pysparkfuncs.current_schema.__doc__
def current_user() -> Column:
return _invoke_function("current_user")
current_user.__doc__ = pysparkfuncs.current_user.__doc__
def user() -> Column:
return _invoke_function("user")
user.__doc__ = pysparkfuncs.user.__doc__
def assert_true(col: "ColumnOrName", errMsg: Optional[Union[Column, str]] = None) -> Column:
if errMsg is None:
return _invoke_function_over_columns("assert_true", col)
if not isinstance(errMsg, (str, Column)):
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "errMsg", "arg_type": type(errMsg).__name__},
)
_err_msg = lit(errMsg) if isinstance(errMsg, str) else _to_col(errMsg)
return _invoke_function("assert_true", _to_col(col), _err_msg)
assert_true.__doc__ = pysparkfuncs.assert_true.__doc__
def raise_error(errMsg: Union[Column, str]) -> Column:
if not isinstance(errMsg, (str, Column)):
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "errMsg", "arg_type": type(errMsg).__name__},
)
_err_msg = lit(errMsg) if isinstance(errMsg, str) else _to_col(errMsg)
return _invoke_function("raise_error", _err_msg)
raise_error.__doc__ = pysparkfuncs.raise_error.__doc__
def crc32(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("crc32", col)
crc32.__doc__ = pysparkfuncs.crc32.__doc__
def hash(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("hash", *cols)
hash.__doc__ = pysparkfuncs.hash.__doc__
def xxhash64(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("xxhash64", *cols)
xxhash64.__doc__ = pysparkfuncs.xxhash64.__doc__
def md5(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("md5", col)
md5.__doc__ = pysparkfuncs.md5.__doc__
def sha1(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sha1", col)
sha1.__doc__ = pysparkfuncs.sha1.__doc__
def sha2(col: "ColumnOrName", numBits: int) -> Column:
return _invoke_function("sha2", _to_col(col), lit(numBits))
sha2.__doc__ = pysparkfuncs.sha2.__doc__
def hll_sketch_agg(col: "ColumnOrName", lgConfigK: Optional[Union[int, Column]] = None) -> Column:
if lgConfigK is None:
return _invoke_function_over_columns("hll_sketch_agg", col)
else:
_lgConfigK = lit(lgConfigK) if isinstance(lgConfigK, int) else lgConfigK
return _invoke_function_over_columns("hll_sketch_agg", col, _lgConfigK)
hll_sketch_agg.__doc__ = pysparkfuncs.hll_sketch_agg.__doc__
def hll_union_agg(col: "ColumnOrName", allowDifferentLgConfigK: Optional[bool] = None) -> Column:
if allowDifferentLgConfigK is None:
return _invoke_function_over_columns("hll_union_agg", col)
else:
_allowDifferentLgConfigK = (
lit(allowDifferentLgConfigK)
if isinstance(allowDifferentLgConfigK, bool)
else allowDifferentLgConfigK
)
return _invoke_function_over_columns("hll_union_agg", col, _allowDifferentLgConfigK)
hll_union_agg.__doc__ = pysparkfuncs.hll_union_agg.__doc__
def hll_sketch_estimate(col: "ColumnOrName") -> Column:
return _invoke_function("hll_sketch_estimate", _to_col(col))
hll_sketch_estimate.__doc__ = pysparkfuncs.hll_sketch_estimate.__doc__
def hll_union(
col1: "ColumnOrName", col2: "ColumnOrName", allowDifferentLgConfigK: Optional[bool] = None
) -> Column:
if allowDifferentLgConfigK is not None:
return _invoke_function(
"hll_union", _to_col(col1), _to_col(col2), lit(allowDifferentLgConfigK)
)
else:
return _invoke_function("hll_union", _to_col(col1), _to_col(col2))
hll_union.__doc__ = pysparkfuncs.hll_union.__doc__
# Predicates Function
def ifnull(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("ifnull", col1, col2)
ifnull.__doc__ = pysparkfuncs.ifnull.__doc__
def isnotnull(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("isnotnull", col)
isnotnull.__doc__ = pysparkfuncs.isnotnull.__doc__
def equal_null(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("equal_null", col1, col2)
equal_null.__doc__ = pysparkfuncs.equal_null.__doc__
def nullif(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("nullif", col1, col2)
nullif.__doc__ = pysparkfuncs.nullif.__doc__
def nvl(col1: "ColumnOrName", col2: "ColumnOrName") -> Column:
return _invoke_function_over_columns("nvl", col1, col2)
nvl.__doc__ = pysparkfuncs.nvl.__doc__
def nvl2(col1: "ColumnOrName", col2: "ColumnOrName", col3: "ColumnOrName") -> Column:
return _invoke_function_over_columns("nvl2", col1, col2, col3)
nvl2.__doc__ = pysparkfuncs.nvl2.__doc__
def uuid() -> Column:
return _invoke_function_over_columns("uuid")
uuid.__doc__ = pysparkfuncs.uuid.__doc__
def aes_encrypt(
input: "ColumnOrName",
key: "ColumnOrName",
mode: Optional["ColumnOrName"] = None,
padding: Optional["ColumnOrName"] = None,
iv: Optional["ColumnOrName"] = None,
aad: Optional["ColumnOrName"] = None,
) -> Column:
_mode = lit("GCM") if mode is None else _to_col(mode)
_padding = lit("DEFAULT") if padding is None else _to_col(padding)
_iv = lit("") if iv is None else _to_col(iv)
_aad = lit("") if aad is None else _to_col(aad)
return _invoke_function_over_columns("aes_encrypt", input, key, _mode, _padding, _iv, _aad)
aes_encrypt.__doc__ = pysparkfuncs.aes_encrypt.__doc__
def aes_decrypt(
input: "ColumnOrName",
key: "ColumnOrName",
mode: Optional["ColumnOrName"] = None,
padding: Optional["ColumnOrName"] = None,
aad: Optional["ColumnOrName"] = None,
) -> Column:
_mode = lit("GCM") if mode is None else _to_col(mode)
_padding = lit("DEFAULT") if padding is None else _to_col(padding)
_aad = lit("") if aad is None else _to_col(aad)
return _invoke_function_over_columns("aes_decrypt", input, key, _mode, _padding, _aad)
aes_decrypt.__doc__ = pysparkfuncs.aes_decrypt.__doc__
def try_aes_decrypt(
input: "ColumnOrName",
key: "ColumnOrName",
mode: Optional["ColumnOrName"] = None,
padding: Optional["ColumnOrName"] = None,
aad: Optional["ColumnOrName"] = None,
) -> Column:
_mode = lit("GCM") if mode is None else _to_col(mode)
_padding = lit("DEFAULT") if padding is None else _to_col(padding)
_aad = lit("") if aad is None else _to_col(aad)
return _invoke_function_over_columns("try_aes_decrypt", input, key, _mode, _padding, _aad)
try_aes_decrypt.__doc__ = pysparkfuncs.try_aes_decrypt.__doc__
def sha(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("sha", col)
sha.__doc__ = pysparkfuncs.sha.__doc__
def input_file_block_length() -> Column:
return _invoke_function_over_columns("input_file_block_length")
input_file_block_length.__doc__ = pysparkfuncs.input_file_block_length.__doc__
def input_file_block_start() -> Column:
return _invoke_function_over_columns("input_file_block_start")
input_file_block_start.__doc__ = pysparkfuncs.input_file_block_start.__doc__
def reflect(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("reflect", *cols)
reflect.__doc__ = pysparkfuncs.reflect.__doc__
def java_method(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("java_method", *cols)
java_method.__doc__ = pysparkfuncs.java_method.__doc__
def version() -> Column:
return _invoke_function_over_columns("version")
version.__doc__ = pysparkfuncs.version.__doc__
def typeof(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("typeof", col)
typeof.__doc__ = pysparkfuncs.typeof.__doc__
def stack(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_columns("stack", *cols)
stack.__doc__ = pysparkfuncs.stack.__doc__
def random(
seed: Optional["ColumnOrName"] = None,
) -> Column:
if seed is not None:
return _invoke_function_over_columns("random", seed)
else:
return _invoke_function_over_columns("random")
random.__doc__ = pysparkfuncs.random.__doc__
def bitmap_bit_position(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bitmap_bit_position", col)
bitmap_bit_position.__doc__ = pysparkfuncs.bitmap_bit_position.__doc__
def bitmap_bucket_number(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bitmap_bucket_number", col)
bitmap_bucket_number.__doc__ = pysparkfuncs.bitmap_bucket_number.__doc__
def bitmap_construct_agg(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bitmap_construct_agg", col)
bitmap_construct_agg.__doc__ = pysparkfuncs.bitmap_construct_agg.__doc__
def bitmap_count(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bitmap_count", col)
bitmap_count.__doc__ = pysparkfuncs.bitmap_count.__doc__
def bitmap_or_agg(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("bitmap_or_agg", col)
bitmap_or_agg.__doc__ = pysparkfuncs.bitmap_or_agg.__doc__
# Call Functions
def call_udf(udfName: str, *cols: "ColumnOrName") -> Column:
return _invoke_function(udfName, *[_to_col(c) for c in cols])
call_udf.__doc__ = pysparkfuncs.call_udf.__doc__
def unwrap_udt(col: "ColumnOrName") -> Column:
return _invoke_function("unwrap_udt", _to_col(col))
unwrap_udt.__doc__ = pysparkfuncs.unwrap_udt.__doc__
def udf(
f: Optional[Union[Callable[..., Any], "DataTypeOrString"]] = None,
returnType: "DataTypeOrString" = StringType(),
useArrow: Optional[bool] = None,
) -> Union["UserDefinedFunctionLike", Callable[[Callable[..., Any]], "UserDefinedFunctionLike"]]:
if f is None or isinstance(f, (str, DataType)):
# If DataType has been passed as a positional argument
# for decorator use it as a returnType
return_type = f or returnType
return functools.partial(
_create_py_udf,
returnType=return_type,
useArrow=useArrow,
)
else:
return _create_py_udf(f=f, returnType=returnType, useArrow=useArrow)
udf.__doc__ = pysparkfuncs.udf.__doc__
def udtf(
cls: Optional[Type] = None,
*,
returnType: Optional[Union[StructType, str]] = None,
useArrow: Optional[bool] = None,
) -> Union["UserDefinedTableFunction", Callable[[Type], "UserDefinedTableFunction"]]:
if cls is None:
return functools.partial(_create_py_udtf, returnType=returnType, useArrow=useArrow)
else:
return _create_py_udtf(cls=cls, returnType=returnType, useArrow=useArrow)
udtf.__doc__ = pysparkfuncs.udtf.__doc__
def call_function(funcName: str, *cols: "ColumnOrName") -> Column:
expressions = [_to_col(c)._expr for c in cols]
return Column(CallFunction(funcName, expressions))
call_function.__doc__ = pysparkfuncs.call_function.__doc__
def _test() -> None:
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.functions
globs = pyspark.sql.connect.functions.__dict__.copy()
# Spark Connect does not support Spark Context but the test depends on that.
del pyspark.sql.connect.functions.monotonically_increasing_id.__doc__
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.functions tests")
.remote("local[4]")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.sql.connect.functions,
globs=globs,
optionflags=doctest.ELLIPSIS
| doctest.NORMALIZE_WHITESPACE
| doctest.IGNORE_EXCEPTION_DETAIL,
)
globs["spark"].stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()