def zip_with_index()

in core/src/main/python/synapse/ml/cyber/utils/spark_utils.py [0:0]


    def zip_with_index(
            df: DataFrame,
            start_index: int = 0,
            col_name: str = 'rowId',
            partition_col: Union[List[str], str] = [],
            order_by_col: Union[List[str], str] = []) -> DataFrame:
        """ add an index to the given dataframe

        Parameters
        ----------
        df : dataframe
            the dataframe to add the index to
        start_index : int
            the value to start the count from
        col_name : str
            the name of the index column which will be added as last column in the output data frame
        partition_col : Union[List[str], str]
            optional column name or list of columns names that define a partitioning to assign indices independently to,
            e.g., assign sequential indices separately to each distinct tenant
        order_by_col : Union[List[str], str]
            optional order by column name or list of columns that are used for sorting
            the data frame or partitions before indexing
        """
        if df is None:
            raise ValueError("df cannot be None")
        if col_name is None:
            raise ValueError("col_name cannot be None")
        if partition_col is None:
            raise ValueError("partition_col cannot be None")
        if order_by_col is None:
            raise ValueError("order_by_col cannot be None")

        # coalese input columns to arrays from singular strings
        partition_col = partition_col if isinstance(partition_col, list) else [partition_col]
        order_by_col = order_by_col if isinstance(order_by_col, list) else [order_by_col]

        if len(partition_col) > 0:
            partition_columns = [f.col(cn) for cn in partition_col]
            window = Window.partitionBy(*partition_columns)

            if len(order_by_col) > 0:
                order_by_columns = [f.col(cn) for cn in order_by_col]
                window = window.orderBy(*order_by_columns)

            return df.withColumn(col_name, f.row_number().over(window) - 1 + start_index)
        else:
            if len(order_by_col) > 0:
                order_by_columns = [f.col(cn) for cn in order_by_col]
                df = df.orderBy(*order_by_columns)

            output_schema = t.StructType(df.schema.fields + [t.StructField(col_name, t.LongType(), True)])
            return df.rdd.zipWithIndex().map(lambda line: (list(line[0]) + [line[1] + start_index])).toDF(output_schema)