in core/src/main/python/synapse/ml/cyber/utils/spark_utils.py [0:0]
def zip_with_index(
df: DataFrame,
start_index: int = 0,
col_name: str = 'rowId',
partition_col: Union[List[str], str] = [],
order_by_col: Union[List[str], str] = []) -> DataFrame:
""" add an index to the given dataframe
Parameters
----------
df : dataframe
the dataframe to add the index to
start_index : int
the value to start the count from
col_name : str
the name of the index column which will be added as last column in the output data frame
partition_col : Union[List[str], str]
optional column name or list of columns names that define a partitioning to assign indices independently to,
e.g., assign sequential indices separately to each distinct tenant
order_by_col : Union[List[str], str]
optional order by column name or list of columns that are used for sorting
the data frame or partitions before indexing
"""
if df is None:
raise ValueError("df cannot be None")
if col_name is None:
raise ValueError("col_name cannot be None")
if partition_col is None:
raise ValueError("partition_col cannot be None")
if order_by_col is None:
raise ValueError("order_by_col cannot be None")
# coalese input columns to arrays from singular strings
partition_col = partition_col if isinstance(partition_col, list) else [partition_col]
order_by_col = order_by_col if isinstance(order_by_col, list) else [order_by_col]
if len(partition_col) > 0:
partition_columns = [f.col(cn) for cn in partition_col]
window = Window.partitionBy(*partition_columns)
if len(order_by_col) > 0:
order_by_columns = [f.col(cn) for cn in order_by_col]
window = window.orderBy(*order_by_columns)
return df.withColumn(col_name, f.row_number().over(window) - 1 + start_index)
else:
if len(order_by_col) > 0:
order_by_columns = [f.col(cn) for cn in order_by_col]
df = df.orderBy(*order_by_columns)
output_schema = t.StructType(df.schema.fields + [t.StructField(col_name, t.LongType(), True)])
return df.rdd.zipWithIndex().map(lambda line: (list(line[0]) + [line[1] + start_index])).toDF(output_schema)