def localOutlierFactor()

in spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala [53:150]


  def localOutlierFactor(
      dataframe: DataFrame,
      k: Int = 20,
      geometry: String = null,
      handleTies: Boolean = false,
      useSphere: Boolean = false,
      resultColumnName: String = "lof"): DataFrame = {

    if (k < 1)
      throw new IllegalArgumentException("k must be a positive integer")

    val prior: String = if (handleTies) {
      val prior =
        SparkSession.getActiveSession.get.conf
          .get("spark.sedona.join.knn.includeTieBreakers", "false")
      SparkSession.getActiveSession.get.conf.set("spark.sedona.join.knn.includeTieBreakers", true)
      prior
    } else "false" // else case to make compiler happy

    val distanceFunction: (Column, Column) => Column =
      if (useSphere) ST_DistanceSphere else ST_Distance
    val useSpheroidString = if (useSphere) "True" else "False" // for the SQL expression

    val geometryColumn =
      if (geometry == null) getGeometryColumnName(dataframe.schema) else geometry

    val KNNFunction = "ST_KNN"

    // Store original contents, prep necessary columns
    val formattedDataframe = dataframe
      .withColumn(CONTENTS_COLUMN_NAME, f.struct("*"))
      .withColumn(ID_COLUMN_NAME, f.sha2(f.to_json(f.col(CONTENTS_COLUMN_NAME)), 256))
      .withColumnRenamed(geometryColumn, "geometry")

    val kDistanceDf = formattedDataframe
      .alias("l")
      .join(
        formattedDataframe.alias("r"),
        // k + 1 because we are not counting the row matching to itself
        f.expr(f"$KNNFunction(l.geometry, r.geometry, $k + 1, $useSpheroidString)") && f.col(
          f"l.$ID_COLUMN_NAME") =!= f.col(f"r.$ID_COLUMN_NAME"))
      .groupBy(f"l.$ID_COLUMN_NAME")
      .agg(
        f.first("l.geometry").alias("geometry"),
        f.first(f"l.$CONTENTS_COLUMN_NAME").alias(CONTENTS_COLUMN_NAME),
        f.max(distanceFunction(f.col("l.geometry"), f.col("r.geometry"))).alias("k_distance"),
        f.collect_list(f"r.$ID_COLUMN_NAME").alias("neighbors"))
      .checkpoint()

    val lrdDf = kDistanceDf
      .alias("A")
      .select(
        f.col(ID_COLUMN_NAME).alias("a_id"),
        f.col(CONTENTS_COLUMN_NAME),
        f.col("geometry").alias("a_geometry"),
        f.explode(f.col("neighbors")).alias("n_id"))
      .join(
        kDistanceDf.select(
          f.col(ID_COLUMN_NAME).alias("b_id"),
          f.col("geometry").alias("b_geometry"),
          f.col("k_distance").alias("b_k_distance")),
        f.expr("n_id = b_id"))
      .select(
        f.col("a_id"),
        f.col("b_id"),
        f.col(CONTENTS_COLUMN_NAME),
        f.array_max(
          f.array(
            f.col("b_k_distance"),
            distanceFunction(f.col("a_geometry"), f.col("b_geometry"))))
          .alias("rd"))
      .groupBy("a_id")
      .agg(
        // + 1e-10 to avoid division by zero, matches sklearn impl
        (f.lit(1.0) / (f.mean("rd") + 1e-10)).alias("lrd"),
        f.collect_list(f.col("b_id")).alias("neighbors"),
        f.first(CONTENTS_COLUMN_NAME).alias(CONTENTS_COLUMN_NAME))

    val ret = lrdDf
      .select(
        f.col("a_id"),
        f.col("lrd").alias("a_lrd"),
        f.col(CONTENTS_COLUMN_NAME),
        f.explode(f.col("neighbors")).alias("n_id"))
      .join(
        lrdDf.select(f.col("a_id").alias("b_id"), f.col("lrd").alias("b_lrd")),
        f.expr("n_id = b_id"))
      .groupBy("a_id")
      .agg(
        f.first(CONTENTS_COLUMN_NAME).alias(CONTENTS_COLUMN_NAME),
        (f.sum("b_lrd") / (f.count("b_lrd") * f.first("a_lrd"))).alias(resultColumnName))
      .select(f.col(f"$CONTENTS_COLUMN_NAME.*"), f.col(resultColumnName))

    if (handleTies)
      SparkSession.getActiveSession.get.conf
        .set("spark.sedona.join.knn.includeTieBreakers", prior)
    ret
  }