backend/app/ingestion/phase2/IngestStorePolling.scala (166 lines of code) (raw):
package ingestion.phase2
import java.nio.file.{Files, Path}
import java.util.UUID
import org.apache.pekko.actor.{ActorSystem, Cancellable}
import cats.syntax.either._
import com.amazonaws.services.cloudwatch.model.MetricDatum
import extraction.Worker
import model.Uri
import model.ingestion.Key
import org.apache.xmlbeans.impl.soap.Detail
import services.ingestion.IngestionServices
import services.{FingerprintServices, IngestStorage, MetricUpdate, Metrics, MetricsService, ScratchSpace}
import utils.attempt.{Attempt, ElasticSearchQueryFailure, Failure, UnknownFailure}
import utils.{Logging, WorkerControl}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Success, Failure => SFailure}
import services.observability.{EventDetails, IngestionEvent, IngestionEventType, EventMetadata, PostgresClient, EventStatus}
case class WorkSelector(numberOfNodes: Int, thisNode: Int) extends Logging {
def isSelected(long: Long): Boolean = {
val workNode = math.abs(long % numberOfNodes)
workNode == thisNode
}
}
class IngestStorePolling(
actorSystem: ActorSystem,
executionContext: ExecutionContext,
workerControl: WorkerControl,
ingestStorage: IngestStorage,
scratchSpace: ScratchSpace,
ingestionServices: IngestionServices,
batchSize: Int,
metricsService: MetricsService,
postgresClient: PostgresClient) extends Logging {
implicit val workerContext: ExecutionContext = executionContext
private val minimumWait = 10.second
private val maximumWait = 1.minute
var cancellable: Option[Cancellable] = None
def start() = {
// wait a minute before we start anything so the cluster config settles
schedulePoll(maximumWait)
}
def stop(): Future[Unit] = {
Future.successful(cancellable.foreach(_.cancel()))
}
def schedulePoll(duration: FiniteDuration): Unit = {
cancellable = Some(actorSystem.scheduler.scheduleOnce(duration) { pollIngestStore() })
}
def pollIngestStore(): Unit = {
try {
val pollCompleteFuture: Future[FiniteDuration] = getNextBatch.fold(
failure => {
logger.warn(s"Failed to poll ingestion store $failure")
metricsService.updateMetric(Metrics.batchesFailed)
maximumWait
},
batch => {
if(batch.isEmpty) {
maximumWait
} else {
val results = batch.map { key =>
logger.info(s"Processing $key")
val result = processKey(key)
result match {
case Left(failure) =>
metricsService.updateMetric(Metrics.itemsFailed)
logger.warn(s"Failed to process $key: $failure. File will be moved to dead letter bucket. To re-ingest the file, " +
s"either re-upload it or use the /api/ingestion/retry-dead-letter-files endpoint to re-ingest all dead letter files.")
ingestStorage.sendToDeadLetterBucket(key)
case _ => ingestStorage.delete(key)
}
result
}.collect { case Right(success) => success }
metricsService.updateMetrics(List(
MetricUpdate(Metrics.itemsIngested, results.size),
MetricUpdate(Metrics.batchesIngested, 1)))
logger.info(s"Processed ${results.size}. Checking for work again in $minimumWait")
minimumWait
}
}
)
pollCompleteFuture.onComplete {
case Success(pollDuration) => schedulePoll(pollDuration)
case SFailure(t) =>
logger.error("Exception whilst processing ingestion batch", t)
metricsService.updateMetric(Metrics.batchesFailed)
schedulePoll(maximumWait)
}
} catch {
case NonFatal(t) =>
logger.error("Exception whilst getting next batch from ingestion store", t)
metricsService.updateMetric(Metrics.batchesFailed)
schedulePoll(maximumWait)
}
}
def processKey(key: Key): Either[Failure, Unit] = {
for {
context <- ingestStorage.getMetadata(key)
_ <- fetchData(key) { case (path, fingerprint) =>
val ingestionMetaData = EventMetadata(fingerprint.value, context.ingestion)
try {
val ingestResult = ingestionServices.ingestFile(context, fingerprint, path)
ingestResult match {
case Left(failure) =>
val details = EventDetails.errorDetails(failure.msg, failure.cause.map(throwable => throwable.getStackTrace.toString) )
postgresClient.insertEvent { failure match {
case _: ElasticSearchQueryFailure =>
IngestionEvent(ingestionMetaData, eventType = IngestionEventType.InitialElasticIngest, status = EventStatus.Failure, details = details)
case _ =>
IngestionEvent(ingestionMetaData, eventType = IngestionEventType.IngestFile, status = EventStatus.Failure, details = details)
}}
case Right(_) => postgresClient.insertEvent(IngestionEvent(ingestionMetaData, eventType = IngestionEventType.IngestFile))
}
ingestResult
} catch {
case NonFatal(t) =>
logger.error(s"Unexpected exception", t)
throw t
}
}
} yield {
()
}
}
/* Fetches the data from the object store, computing the fingerprint in flight */
def fetchData[T](key: Key)(f: (Path, Uri) => Either[Failure, T]): Either[Failure, T] = {
ingestStorage.getData(key).flatMap { sourceInputStream =>
try {
Either.catchNonFatal {
val path = scratchSpace.pathFor(key)
logger.info(s"Fetching data for $key to $path")
Files.copy(sourceInputStream, path)
path -> Uri(FingerprintServices.createFingerprintFromFile(path.toFile))
}.leftMap(UnknownFailure.apply)
} finally {
sourceInputStream.close()
}
}.flatMap{ case (path, uri) =>
try {
f(path, uri)
} finally {
Files.delete(path)
}
}
}
def getNextBatch: Attempt[Iterable[(Long, UUID)]] = {
for {
selector <- workSelector
keys <- ingestStorage.list.toAttempt
} yield {
logger.info(s"Getting batch for node ${selector.thisNode} (of ${selector.numberOfNodes})")
val batch = keys
.filter { case (_, uuid) =>
selector.isSelected(uuid.getLeastSignificantBits)
}
.take(batchSize)
logger.info(s"Got a batch of size ${batch.size} for node ${selector.thisNode}")
batch
}
}
def workSelector: Attempt[WorkSelector] = {
workerControl.getWorkerDetails.map { details =>
logger.info(s"Cluster state ${details}")
WorkSelector(details.nodes.size, details.nodes.toSeq.indexOf(details.thisNode))
}
}
}