def _transform()

in core/src/main/python/synapse/ml/cyber/anomaly/complement_access.py [0:0]


    def _transform(self, df: DataFrame) -> DataFrame:
        """ generate a dataframe which consists of a sample from the complement set

        Parameters
        ----------
        df: a given dataframe containing the columns in 'indexed_col_names_arr'

        Returns
        -------
        dataframe which which consists of a sample from the complement set
        """
        complementset_factor = self.complementset_factor

        if complementset_factor == 0:
            return DataFrameUtils.make_empty(df)

        the_partition_key = self.partition_key
        indexed_col_names_arr = self.indexed_col_names_arr

        if the_partition_key is None:
            partition_key = '__dummy_partition_key__'
            df = df.withColumn(partition_key, f.lit(0)).cache()
        else:
            partition_key = the_partition_key
            df = df.cache()

        limits_dfs = [df.select(
            partition_key, curr_col_name
        ).distinct().groupBy(partition_key).agg(
            f.min(curr_col_name).alias(ComplementAccessTransformer._min_index_token(curr_col_name)),
            f.max(curr_col_name).alias(ComplementAccessTransformer._max_index_token(curr_col_name))
        ).orderBy(
            partition_key
        ) for curr_col_name in indexed_col_names_arr]

        def make_randint(factor):
            schema = t.ArrayType(
                t.StructType([t.StructField(
                    curr_col_name, t.IntegerType()
                ) for curr_col_name in indexed_col_names_arr])
            )

            @f.udf(schema)
            def randint(min_index_arr, max_index_arr):
                return [tuple([random.randint(min_index, max_index) for min_index, max_index
                               in zip(min_index_arr, max_index_arr)]) for _ in range(factor)]

            return randint

        pre_complement_candidates_df = df.cache()

        for limits_df in limits_dfs:
            pre_complement_candidates_df = pre_complement_candidates_df.join(limits_df, partition_key).cache()

        cols = [f.col(partition_key)] + [f.col(curr_col_name) for curr_col_name in indexed_col_names_arr]
        randint = make_randint(complementset_factor)

        complement_candidates_df = pre_complement_candidates_df.withColumn(
            ComplementAccessTransformer._tuple_token(),
            f.explode(randint(
                f.array([f.col(ComplementAccessTransformer._min_index_token(curr_col_name)) for curr_col_name
                         in indexed_col_names_arr]),
                f.array([f.col(ComplementAccessTransformer._max_index_token(curr_col_name)) for curr_col_name
                         in indexed_col_names_arr])
            ))
        ).select(
            *([partition_key] + [f.col('{0}.{1}'.format(
                ComplementAccessTransformer._tuple_token(),
                curr_col_name
            )).alias(curr_col_name) for curr_col_name in indexed_col_names_arr])
        ).distinct().orderBy(*cols)

        tuples_df = df.select(*cols).distinct().orderBy(*cols)

        res_df = complement_candidates_df.join(
            tuples_df,
            [partition_key] + indexed_col_names_arr,
            how='left_anti'
        ).select(*cols).orderBy(*cols)

        if the_partition_key is None:
            res_df = res_df.drop(partition_key)

        return res_df