def getEvents()

in backend/app/services/observability/PostgresClient.scala [101:248]


    def getEvents(ingestId: String, ingestIdIsPrefix: Boolean): Either[PostgresReadFailure, List[BlobStatus]] = {
        Try {
          /**
            * The aim of this query is to merge ingestion events for each blob into a single row, containing the success/failure
            * status of each extractor that was expected to run on the ingestion.
            *
            * The subqueries are as follows:
            *   blob_extractors - get the extractors expected to run for each blob
            *   extractor_statuses - get the success/failure status for the extractors identified in blob_extractors
            *
            */
          val results = sql"""
          WITH problem_blobs AS (
            -- assume that blobs with more than 100 ingestion_events are failing to be ingested in an infinite loop
            SELECT blob_id
            from ingestion_events
            WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId}
            group by 1
            having count(*) > 100
          ),
          blob_extractors AS (
            -- get all the extractors expected for a given blob
            SELECT ingest_id, blob_id, jsonb_array_elements_text(details -> 'extractors') as extractor from ingestion_events
            WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId}
            AND type = ${IngestionEventType.MimeTypeDetected.toString}
            AND blob_id NOT IN (SELECT blob_id FROM problem_blobs)
          ),
          extractor_statuses as (
            -- Aggregate all the status updates for the relevant extractors for a given blob
            SELECT
              blob_extractors.blob_id,
              blob_extractors.ingest_id,
              blob_extractors.extractor,
              -- As the same status update may happen multiple times if a blob is reingested, it's useful to have the time
              -- this field is destined to be converted to a string so use epoch time (seconds) to make getting it back into
              -- a date a bit less of a faff
              ARRAY_AGG(EXTRACT(EPOCH from ingestion_events.event_time)) AS extractor_event_times,
              ARRAY_AGG(ingestion_events.status) AS extractor_event_statuses
            FROM blob_extractors
            LEFT JOIN ingestion_events
              ON blob_extractors.blob_id = ingestion_events.blob_id
              AND blob_extractors.ingest_id = ingestion_events.ingest_id
              -- there is no index on extractorName but we aren't expecting too many events for the same blob_id/ingest_id
              AND blob_extractors.extractor = ingestion_events.details ->> 'extractorName'
              AND ingestion_events.type = 'RunExtractor'
            -- A file may be uploaded multiple times within different ingests - use group by to merge them together
            GROUP BY 1,2,3
          )
          SELECT
            ie.blob_id,
            ie.ingest_id,
            ie.ingest_start,
            ie.most_recent_event,
            ie.event_types,
            ie.event_times,
            ie.event_statuses,
            ie.errors,
            ie.workspace_name AS "workspaceName",
            ie.mime_types AS "mimeTypes",
            ie.infinite_loop AS "infiniteLoop",
            ARRAY_AGG(DISTINCT blob_metadata.path ) AS paths,
            (ARRAY_AGG(blob_metadata.file_size))[1] as "fileSize",
            ARRAY_REMOVE(ARRAY_AGG(extractor_statuses.extractor), NULL) AS extractors,
            -- You can't array_agg arrays of varying cardinality so here we convert to string
            ARRAY_REMOVE(ARRAY_AGG(ARRAY_TO_STRING(extractor_statuses.extractor_event_times, ',','null')), NULL) AS "extractorEventTimes",
            ARRAY_REMOVE(ARRAY_AGG(ARRAY_TO_STRING(extractor_statuses.extractor_event_statuses, ',','null')), NULL) AS "extractorStatuses"
          FROM (
            SELECT
              blob_id,
              ingest_id,
              MIN(EXTRACT(EPOCH FROM event_time)) AS ingest_start,
              MAX(EXTRACT(EPOCH FROM event_time)) AS most_recent_event,
              ARRAY_AGG(type) as event_types,
              ARRAY_AGG(EXTRACT(EPOCH from event_time)) as event_times,
              ARRAY_AGG(status) as event_statuses,
              ARRAY_AGG(details -> 'errors') as errors,
              (ARRAY_AGG(details ->> 'workspaceName')  FILTER (WHERE details ->> 'workspaceName' IS NOT NULL))[1] as workspace_name,
              (ARRAY_AGG(details ->> 'mimeTypes')  FILTER (WHERE details ->> 'mimeTypes' IS NOT NULL))[1] as mime_types,
              FALSE AS infinite_loop
            FROM ingestion_events
            WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId}
            AND blob_id NOT IN (SELECT blob_id FROM problem_blobs)
            GROUP BY 1,2
            UNION
            -- blobs in the ingestion that are failing in an infinite loop
            SELECT DISTINCT
              blob_id,
              ingest_id,
              MIN(EXTRACT(EPOCH FROM event_time)) AS ingest_start,
              MAX(EXTRACT(EPOCH FROM event_time)) AS most_recent_event,
              array[]::text[] AS event_types,
              array[]::numeric[] AS event_times,
              array[]::text[] AS event_statuses,
              array['[]'::jsonb] AS errors,
              NULL AS workspace_name,
              NULL AS mime_types,
              TRUE AS infinite_loop
            FROM ingestion_events
            WHERE ingest_id LIKE ${if(ingestIdIsPrefix) LikeConditionEscapeUtil.beginsWith(ingestId) else ingestId}
            AND blob_id IN (SELECT blob_id FROM problem_blobs)
            GROUP BY 1,2
          ) AS ie
          LEFT JOIN blob_metadata USING(ingest_id, blob_id)
          LEFT JOIN extractor_statuses
            ON extractor_statuses.blob_id = ie.blob_id
            AND extractor_statuses.ingest_id = ie.ingest_id
          GROUP BY 1,2,3,4,5,6,7,8,9,10,11
          ORDER by ingest_start desc
     """.map(rs => {
            val eventTypes = rs.array("event_types").getArray.asInstanceOf[Array[String]]
                BlobStatus(
                  EventMetadata(
                      rs.string("blob_id"),
                      rs.string("ingest_id")
                  ),
                  BlobStatus.parsePathsArray(rs.array("paths").getArray().asInstanceOf[Array[String]]),
                  rs.longOpt("fileSize"),
                  rs.stringOpt("workspaceName"),
                  PostgresHelpers.postgresEpochToDateTime(rs.double("ingest_start")),
                  PostgresHelpers.postgresEpochToDateTime(rs.double("most_recent_event")),
                  IngestionEventStatus.parseEventStatus(
                    rs.array("event_times").getArray.asInstanceOf[Array[java.math.BigDecimal]].map(t =>PostgresHelpers.postgresEpochToDateTime(t.doubleValue)),
                    eventTypes,
                    rs.array("event_statuses").getArray.asInstanceOf[Array[String]]
                  ),
                  rs.arrayOpt("extractors").map { extractors =>
                      ExtractorStatus.parseDbStatusEvents(
                          extractors.getArray().asInstanceOf[Array[String]],
                          rs.array("extractorEventTimes").getArray().asInstanceOf[Array[String]],
                          rs.array("extractorStatuses").getArray().asInstanceOf[Array[String]]
                      )
                  }.getOrElse(List()),
                  IngestionError.parseIngestionErrors(
                    rs.array("errors").getArray.asInstanceOf[Array[String]],
                    eventTypes
                  ),
                  rs.stringOpt("mimeTypes"),
                  rs.boolean("infiniteLoop")
                )
            }
            ).list()
            results
        }
        match {
            case Success(results) => Right(results)
            case Failure(exception) => Left(PostgresReadFailure(exception, s"getEvents failed: ${exception.getMessage}"))
        }
    }