backend/app/services/observability/Models.scala (184 lines of code) (raw):
package services.observability
import extraction.Extractor
import play.api.libs.json.{Format, Json}
import model.manifest.Blob
import org.apache.commons.codec.digest.DigestUtils
import org.joda.time.{DateTime, DateTimeZone}
import services.index.IngestionData
import services.observability.ExtractorType.ExtractorType
import services.observability.IngestionEventType.{IngestionEventType, RunExtractor}
import services.observability.EventStatus.EventStatus
import play.api.libs.json.JodaWrites.jodaDateWrites
import play.api.libs.json.JodaReads.jodaDateReads
import utils.Logging
import java.security.MessageDigest
import scala.util.{Failure, Try, Success => TrySuccess}
object JodaReadWrites {
private val datePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateWrites = jodaDateWrites(datePattern)
val dateReads = jodaDateReads(datePattern)
}
object IngestionEventType extends Enumeration {
type IngestionEventType = Value
val HashComplete, WorkspaceUpload, BlobCopy, ManifestExists, MimeTypeDetected, IngestFile, InitialElasticIngest, RunExtractor = Value
implicit val format: Format[IngestionEventType] = Json.formatEnum(this)
}
object ExtractorType extends Enumeration {
type ExtractorType = Value
val OlmEmailExtractor, ZipExtractor, RarExtractor, DocumentBodyExtractor,
PstEmailExtractor, EmlEmailExtractor, MsgEmailExtractor, MBoxEmailExtractor,
CsvTableExtractor, ExcelTableExtractor, OcrMyPdfExtractor, OcrMyPdfImageExtractor,
TesseractPdfOcrExtractor, ImageOcrExtractor, UnknownExtractor, TranscriptionExtractor = Value
def withNameCustom(s: String): Value = {
values.find(_.toString == s) match {
case Some(value) => value
case None => UnknownExtractor
}
}
implicit val format: Format[ExtractorType] = Json.formatEnum(this)
}
object EventStatus extends Enumeration {
type EventStatus = Value
val Started, Success, Failure = Value
implicit val format: Format[EventStatus] = Json.formatEnum(this)
}
case class IngestionError(message: String, stackTrace: Option[String] = None)
case class IngestionErrorsWithEventType(eventType: IngestionEventType, errors: List[IngestionError])
object IngestionError extends Logging {
implicit val format = Json.format[IngestionError]
def parseIngestionErrors(errors: Array[String], eventTypes: Array[String]): List[IngestionErrorsWithEventType] = {
errors.toList.zip(eventTypes.toList).flatMap{
case (e, eventType) =>
if (e == null) None else {
Try(Json.parse(e).as[List[IngestionError]]) match {
case TrySuccess(value) =>
Some(IngestionErrorsWithEventType(IngestionEventType.withName(eventType), value))
case Failure(exception: Throwable) =>
logger.error("Failed to parse ingestion errors. Returning empty list", exception)
None
}
}
}
}
}
object IngestionErrorsWithEventType {
implicit val format = Json.format[IngestionErrorsWithEventType]
}
case class EventDetails(
errors: Option[List[IngestionError]] = None,
extractors: Option[List[ExtractorType]] = None,
blob: Option[Blob] = None,
ingestionData: Option[IngestionData] = None,
extractorName: Option[ExtractorType] = None,
workspaceName: Option[String] = None,
mimeTypes: Option[String] = None
)
object EventDetails {
implicit val detailsFormat = Json.format[EventDetails]
def errorDetails(message: String, stackTrace: Option[String] = None): Option[EventDetails] = Some(EventDetails(Some(List(IngestionError(message, stackTrace)))))
def extractorErrorDetails(extractorName: String, message: String, stackTrace: String): Option[EventDetails] =
Some(EventDetails(
errors = Some(List(IngestionError(message, Some(stackTrace)))),
extractorName = Some(ExtractorType.withNameCustom(extractorName)))
)
def ingestionDataDetails(data: IngestionData, extractors: List[Extractor]): Option[EventDetails] = Some(EventDetails(
extractors = Some(extractors.map(e => ExtractorType.withNameCustom(e.name))),
ingestionData = Some(data),
mimeTypes = Some(data.mimeTypes.map(_.mimeType).mkString(","))
))
def extractorDetails(extractorName: String): Option[EventDetails] = Some(EventDetails(extractorName = Some(ExtractorType.withNameCustom(extractorName))))
}
case class EventMetadata(blobId: String, ingestId: String)
object EventMetadata {
implicit val format = Json.format[EventMetadata]
}
case class IngestionEvent(
metadata: EventMetadata,
eventType: IngestionEventType,
status: EventStatus = EventStatus.Success,
details: Option[EventDetails] = None
)
object IngestionEvent {
implicit val metaDataFormat = Json.format[EventMetadata]
implicit val ingestionEventFormat = Json.format[IngestionEvent]
def workspaceUploadEvent(blobId: String, ingestUri: String, workspaceName: String, status: EventStatus): IngestionEvent = IngestionEvent(
EventMetadata(blobId, ingestUri),
IngestionEventType.WorkspaceUpload,
status,
Some(EventDetails(workspaceName = Some(workspaceName)))
)
}
case class BlobMetadata(ingestId: String, blobId: String, path: String, fileSize: Long)
object BlobMetadata {
implicit val blobMetaDataFormat = Json.format[BlobMetadata]
}
case class ExtractorStatusUpdate(eventTime: Option[DateTime], status: Option[EventStatus])
object ExtractorStatusUpdate {
implicit val dateWrites = JodaReadWrites.dateWrites
implicit val dateReads = JodaReadWrites.dateReads
implicit val format = Json.format[ExtractorStatusUpdate]
}
case class IngestionEventStatus(eventTime: DateTime, eventType: IngestionEventType, eventStatus: EventStatus)
object IngestionEventStatus {
implicit val dateWrites = JodaReadWrites.dateWrites
implicit val dateReads = JodaReadWrites.dateReads
implicit val format = Json.format[IngestionEventStatus]
def parseEventStatus(eventTimes: Array[DateTime], eventTypes: Array[String], eventStatuses: Array[String]): List[IngestionEventStatus] = {
val allEventStatuses = eventTimes.lazyZip(eventTypes).lazyZip(eventStatuses).toList.map {
case (eventTime, eventType, eventStatus) =>
IngestionEventStatus(eventTime, IngestionEventType.withName(eventType), EventStatus.withName(eventStatus))
}
// discard extractor events as we have a separate ExtractorStatus field, and 'RunExtractor' by itself without context of which extractor isn't very helpful
allEventStatuses.filter(es => es.eventType != RunExtractor)
}
}
case class ExtractorStatus(extractorType: ExtractorType, statusUpdates: List[ExtractorStatusUpdate])
object ExtractorStatus {
implicit val format = Json.format[ExtractorStatus]
def parseDbStatusEvents(extractors: Array[String], extractorEventTimes: Array[String], extractorStatuses: Array[String]): List[ExtractorStatus] = {
val statusUpdatesParsed: Seq[List[ExtractorStatusUpdate]] = extractorEventTimes.zip(extractorStatuses).map {
case (times, statuses) => times.split(",").zip(statuses.split(",")).map{
case (time, status) =>
val eventTime = if (time == "null") None else Some(PostgresHelpers.postgresEpochToDateTime(time.toDouble))
val parsedStatus = if (status == "null") None else Some(EventStatus.withName(status))
ExtractorStatusUpdate(eventTime, parsedStatus)}.toList
}.toList
extractors.toList.zip(statusUpdatesParsed).map {case (extractor, statusUpdates) => ExtractorStatus(ExtractorType.withName(extractor), statusUpdates)
}
}
}
case class BlobStatus(
metadata: EventMetadata,
paths: List[String],
fileSize: Option[Long],
workspaceName: Option[String],
ingestStart: DateTime,
mostRecentEvent: DateTime,
eventStatuses: List[IngestionEventStatus],
extractorStatuses: List[ExtractorStatus],
errors: List[IngestionErrorsWithEventType],
mimeTypes: Option[String],
infiniteLoop: Boolean)
object BlobStatus {
implicit val dateWrites = JodaReadWrites.dateWrites
implicit val dateReads = JodaReadWrites.dateReads
implicit val format = Json.format[BlobStatus]
def parsePathsArray(paths: Array[String]): List[String] = {
val nonNullPaths = paths.filter(p => p != null)
if (nonNullPaths.isEmpty) {
List("unknown filename")
} else nonNullPaths.toList
}
/**
* Aims to remove all information from blob status that is likely to identify the user who uploaded it or the file itself
* @param status
* @return
*/
private def anonymise(status: BlobStatus): BlobStatus = {
status.copy(
paths = List("[REDACTED]"),
workspaceName = status.workspaceName.map(DigestUtils.md5Hex)
)
}
def anonymiseEventsOlderThanTwoWeeks(sortedStatuses: List[BlobStatus]): List[BlobStatus] = {
sortedStatuses.map{ status =>
if (status.ingestStart.isBefore(new DateTime().minusDays(14))) {
anonymise(status)
} else status
}
}
}