in core/src/main/python/synapse/ml/cyber/anomaly/complement_access.py [0:0]
def _transform(self, df: DataFrame) -> DataFrame:
""" generate a dataframe which consists of a sample from the complement set
Parameters
----------
df: a given dataframe containing the columns in 'indexed_col_names_arr'
Returns
-------
dataframe which which consists of a sample from the complement set
"""
complementset_factor = self.complementset_factor
if complementset_factor == 0:
return DataFrameUtils.make_empty(df)
the_partition_key = self.partition_key
indexed_col_names_arr = self.indexed_col_names_arr
if the_partition_key is None:
partition_key = '__dummy_partition_key__'
df = df.withColumn(partition_key, f.lit(0)).cache()
else:
partition_key = the_partition_key
df = df.cache()
limits_dfs = [df.select(
partition_key, curr_col_name
).distinct().groupBy(partition_key).agg(
f.min(curr_col_name).alias(ComplementAccessTransformer._min_index_token(curr_col_name)),
f.max(curr_col_name).alias(ComplementAccessTransformer._max_index_token(curr_col_name))
).orderBy(
partition_key
) for curr_col_name in indexed_col_names_arr]
def make_randint(factor):
schema = t.ArrayType(
t.StructType([t.StructField(
curr_col_name, t.IntegerType()
) for curr_col_name in indexed_col_names_arr])
)
@f.udf(schema)
def randint(min_index_arr, max_index_arr):
return [tuple([random.randint(min_index, max_index) for min_index, max_index
in zip(min_index_arr, max_index_arr)]) for _ in range(factor)]
return randint
pre_complement_candidates_df = df.cache()
for limits_df in limits_dfs:
pre_complement_candidates_df = pre_complement_candidates_df.join(limits_df, partition_key).cache()
cols = [f.col(partition_key)] + [f.col(curr_col_name) for curr_col_name in indexed_col_names_arr]
randint = make_randint(complementset_factor)
complement_candidates_df = pre_complement_candidates_df.withColumn(
ComplementAccessTransformer._tuple_token(),
f.explode(randint(
f.array([f.col(ComplementAccessTransformer._min_index_token(curr_col_name)) for curr_col_name
in indexed_col_names_arr]),
f.array([f.col(ComplementAccessTransformer._max_index_token(curr_col_name)) for curr_col_name
in indexed_col_names_arr])
))
).select(
*([partition_key] + [f.col('{0}.{1}'.format(
ComplementAccessTransformer._tuple_token(),
curr_col_name
)).alias(curr_col_name) for curr_col_name in indexed_col_names_arr])
).distinct().orderBy(*cols)
tuples_df = df.select(*cols).distinct().orderBy(*cols)
res_df = complement_candidates_df.join(
tuples_df,
[partition_key] + indexed_col_names_arr,
how='left_anti'
).select(*cols).orderBy(*cols)
if the_partition_key is None:
res_df = res_df.drop(partition_key)
return res_df