def transform()

in auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala [40:58]


  def transform(
      auditEventsRDD: RDD[AuditEvent],
      timeAggregation: String,
      timeRange: TimeRange = TimeRange.always
  ): Dataset[AggregatedAuditEvent] = {
    import spark.implicits._
    auditEventsRDD
      .filterWithinRange(TimeRange(timeRange.since, timeRange.until))
      .toJsonString
      .toJsonTableDataFrame
      .hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
      .withTimeBucketColumn(TIME_BUCKET_FIELD, timeAggregation)
      .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
      .withSubCommandColumns(SUB_COMMAND_FIELD)
      .withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
      .withProjectColumn(PROJECT_FIELD, broadcastGerritProjects.value)
      .aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
      .as[AggregatedAuditEvent]
  }