backend/app/services/observability/PostgresClient.scala (239 lines of code) (raw):

package services.observability import org.joda.time.{DateTime, DateTimeZone} import play.api.libs.json.Json import scalikejdbc._ import services.PostgresConfig import scala.util.{Failure, Success, Try} import utils.Logging import utils.attempt.{PostgresReadFailure, PostgresWriteFailure, Failure => GiantFailure} trait PostgresClient { def insertEvent(event: IngestionEvent): Either[GiantFailure, Unit] def insertMetadata(metaData: BlobMetadata): Either[GiantFailure, Unit] def getEvents (ingestId: String, ingestIdIsPrefix: Boolean): Either[GiantFailure, List[BlobStatus]] def deleteBlobIngestionEventsAndMetadata(blobId: String): Either[GiantFailure, Long] } class PostgresClientDoNothing extends PostgresClient { override def insertEvent(event: IngestionEvent): Either[GiantFailure, Unit] = Right(()) override def insertMetadata(metaData: BlobMetadata): Either[GiantFailure, Unit] = Right(()) override def getEvents (ingestId: String, ingestIdIsPrefix: Boolean): Either[GiantFailure, List[BlobStatus]] = Right(List()) def deleteBlobIngestionEventsAndMetadata(blobId: String): Either[GiantFailure, Long] = Right(0) } object PostgresHelpers { def postgresEpochToDateTime(epoch: Double) = new DateTime((epoch*1000).toLong, DateTimeZone.UTC) } class PostgresClientImpl(postgresConfig: PostgresConfig) extends PostgresClient with Logging { val dbHost = s"jdbc:postgresql://${postgresConfig.host}:${postgresConfig.port}/giant" // initialize JDBC driver & connection pool Class.forName("org.postgresql.Driver") ConnectionPool.singleton(dbHost, postgresConfig.username, postgresConfig.password) implicit val session: AutoSession.type = AutoSession import EventDetails.detailsFormat def insertMetadata(metaData: BlobMetadata): Either[GiantFailure, Unit] = { Try { sql""" INSERT INTO blob_metadata ( ingest_id, blob_id, file_size, path, insert_time ) VALUES ( ${metaData.ingestId}, ${metaData.blobId}, ${metaData.fileSize}, ${metaData.path}, now() );""".execute() } match { case Success(_) => Right(()) case Failure(exception) => logger.warn(s""" An exception occurred while inserting blob metadata blobId: ${metaData.blobId}, ingestId: ${metaData.ingestId} path: ${metaData.path} exception: ${exception.getMessage()}""", exception ) Left(PostgresWriteFailure(exception)) } } def insertEvent(event: IngestionEvent): Either[GiantFailure, Unit] = { Try { val detailsJson = event.details.map(Json.toJson(_).toString).getOrElse("{}") sql""" INSERT INTO ingestion_events ( blob_id, ingest_id, type, status, details, event_time ) VALUES ( ${event.metadata.blobId}, ${event.metadata.ingestId}, ${event.eventType.toString()}, ${event.status.toString()}, $detailsJson::JSONB, now() );""".execute() } match { case Success(_) => Right(()) case Failure(exception) => logger.warn(s""" An exception occurred while inserting ingestion event blobId: ${event.metadata.blobId}, ingestId: ${event.metadata.ingestId} eventType: ${event.eventType.toString()} exception: ${exception.getMessage()}""" ) Left(PostgresWriteFailure(exception)) } } def getEvents(ingestId: String, ingestIdIsPrefix: Boolean): Either[PostgresReadFailure, List[BlobStatus]] = { Try { /** * The aim of this query is to merge ingestion events for each blob into a single row, containing the success/failure * status of each extractor that was expected to run on the ingestion. * * The subqueries are as follows: * blob_extractors - get the extractors expected to run for each blob * extractor_statuses - get the success/failure status for the extractors identified in blob_extractors * */ val results = sql""" WITH problem_blobs AS ( -- assume that blobs with more than 100 ingestion_events are failing to be ingested in an infinite loop SELECT blob_id from ingestion_events WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId} group by 1 having count(*) > 100 ), blob_extractors AS ( -- get all the extractors expected for a given blob SELECT ingest_id, blob_id, jsonb_array_elements_text(details -> 'extractors') as extractor from ingestion_events WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId} AND type = ${IngestionEventType.MimeTypeDetected.toString} AND blob_id NOT IN (SELECT blob_id FROM problem_blobs) ), extractor_statuses as ( -- Aggregate all the status updates for the relevant extractors for a given blob SELECT blob_extractors.blob_id, blob_extractors.ingest_id, blob_extractors.extractor, -- As the same status update may happen multiple times if a blob is reingested, it's useful to have the time -- this field is destined to be converted to a string so use epoch time (seconds) to make getting it back into -- a date a bit less of a faff ARRAY_AGG(EXTRACT(EPOCH from ingestion_events.event_time)) AS extractor_event_times, ARRAY_AGG(ingestion_events.status) AS extractor_event_statuses FROM blob_extractors LEFT JOIN ingestion_events ON blob_extractors.blob_id = ingestion_events.blob_id AND blob_extractors.ingest_id = ingestion_events.ingest_id -- there is no index on extractorName but we aren't expecting too many events for the same blob_id/ingest_id AND blob_extractors.extractor = ingestion_events.details ->> 'extractorName' AND ingestion_events.type = 'RunExtractor' -- A file may be uploaded multiple times within different ingests - use group by to merge them together GROUP BY 1,2,3 ) SELECT ie.blob_id, ie.ingest_id, ie.ingest_start, ie.most_recent_event, ie.event_types, ie.event_times, ie.event_statuses, ie.errors, ie.workspace_name AS "workspaceName", ie.mime_types AS "mimeTypes", ie.infinite_loop AS "infiniteLoop", ARRAY_AGG(DISTINCT blob_metadata.path ) AS paths, (ARRAY_AGG(blob_metadata.file_size))[1] as "fileSize", ARRAY_REMOVE(ARRAY_AGG(extractor_statuses.extractor), NULL) AS extractors, -- You can't array_agg arrays of varying cardinality so here we convert to string ARRAY_REMOVE(ARRAY_AGG(ARRAY_TO_STRING(extractor_statuses.extractor_event_times, ',','null')), NULL) AS "extractorEventTimes", ARRAY_REMOVE(ARRAY_AGG(ARRAY_TO_STRING(extractor_statuses.extractor_event_statuses, ',','null')), NULL) AS "extractorStatuses" FROM ( SELECT blob_id, ingest_id, MIN(EXTRACT(EPOCH FROM event_time)) AS ingest_start, MAX(EXTRACT(EPOCH FROM event_time)) AS most_recent_event, ARRAY_AGG(type) as event_types, ARRAY_AGG(EXTRACT(EPOCH from event_time)) as event_times, ARRAY_AGG(status) as event_statuses, ARRAY_AGG(details -> 'errors') as errors, (ARRAY_AGG(details ->> 'workspaceName') FILTER (WHERE details ->> 'workspaceName' IS NOT NULL))[1] as workspace_name, (ARRAY_AGG(details ->> 'mimeTypes') FILTER (WHERE details ->> 'mimeTypes' IS NOT NULL))[1] as mime_types, FALSE AS infinite_loop FROM ingestion_events WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId} AND blob_id NOT IN (SELECT blob_id FROM problem_blobs) GROUP BY 1,2 UNION -- blobs in the ingestion that are failing in an infinite loop SELECT DISTINCT blob_id, ingest_id, MIN(EXTRACT(EPOCH FROM event_time)) AS ingest_start, MAX(EXTRACT(EPOCH FROM event_time)) AS most_recent_event, array[]::text[] AS event_types, array[]::numeric[] AS event_times, array[]::text[] AS event_statuses, array['[]'::jsonb] AS errors, NULL AS workspace_name, NULL AS mime_types, TRUE AS infinite_loop FROM ingestion_events WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId} AND blob_id IN (SELECT blob_id FROM problem_blobs) GROUP BY 1,2 ) AS ie LEFT JOIN blob_metadata USING(ingest_id, blob_id) LEFT JOIN extractor_statuses ON extractor_statuses.blob_id = ie.blob_id AND extractor_statuses.ingest_id = ie.ingest_id GROUP BY 1,2,3,4,5,6,7,8,9,10,11 ORDER by ingest_start desc """.map(rs => { val eventTypes = rs.array("event_types").getArray.asInstanceOf[Array[String]] BlobStatus( EventMetadata( rs.string("blob_id"), rs.string("ingest_id") ), BlobStatus.parsePathsArray(rs.array("paths").getArray().asInstanceOf[Array[String]]), rs.longOpt("fileSize"), rs.stringOpt("workspaceName"), PostgresHelpers.postgresEpochToDateTime(rs.double("ingest_start")), PostgresHelpers.postgresEpochToDateTime(rs.double("most_recent_event")), IngestionEventStatus.parseEventStatus( rs.array("event_times").getArray.asInstanceOf[Array[java.math.BigDecimal]].map(t =>PostgresHelpers.postgresEpochToDateTime(t.doubleValue)), eventTypes, rs.array("event_statuses").getArray.asInstanceOf[Array[String]] ), rs.arrayOpt("extractors").map { extractors => ExtractorStatus.parseDbStatusEvents( extractors.getArray().asInstanceOf[Array[String]], rs.array("extractorEventTimes").getArray().asInstanceOf[Array[String]], rs.array("extractorStatuses").getArray().asInstanceOf[Array[String]] ) }.getOrElse(List()), IngestionError.parseIngestionErrors( rs.array("errors").getArray.asInstanceOf[Array[String]], eventTypes ), rs.stringOpt("mimeTypes"), rs.boolean("infiniteLoop") ) } ).list() results } match { case Success(results) => Right(results) case Failure(exception) => Left(PostgresReadFailure(exception, s"getEvents failed: ${exception.getMessage}")) } } def deleteBlobIngestionEventsAndMetadata(blobId: String): Either[GiantFailure, Long] = { Try { DB.localTx { implicit session => val numIngestionDeleted = sql"DELETE FROM ingestion_events WHERE blob_id = ${blobId}".executeUpdate() val numBlobMetadataDeleted = sql"DELETE FROM blob_metadata WHERE blob_id = ${blobId}".executeUpdate() numIngestionDeleted + numBlobMetadataDeleted } } match { case Success(numRowsDeleted) => Right(numRowsDeleted) case Failure(exception) => Left(PostgresWriteFailure(exception)) } } }