in backend/app/services/ingestion/IngestionServices.scala [72:136]
override def ingestFile(context: FileContext, blobUri: Uri, path: Path): Either[Failure, Blob] = {
val ingestionMetaData = EventMetadata(blobUri.value, context.ingestion)
postgresClient.insertMetadata(BlobMetadata(
ingestId = context.ingestion,
blobId = blobUri.value,
fileSize = context.file.size.toInt,
path = context.file.uri.value.substring(context.ingestion.length))
)
postgresClient.insertEvent(IngestionEvent(ingestionMetaData, IngestionEventType.HashComplete))
// see if the Blob already exists in the manifest to avoid doing uneeded processing
val blob: Either[Failure, Option[Blob]] = manifest.getBlob(blobUri).map(Some(_)).recoverWith {
// successful DB query, but the blob isn't there
case NotFoundFailure(_) => Right[Failure, Option[Blob]](None)
}
val upload = blob.flatMap { maybeBlob =>
if (maybeBlob.isEmpty) {
val result = objectStorage.create(blobUri.toStoragePath, path)
result match {
case Right(_) => postgresClient.insertEvent(IngestionEvent(ingestionMetaData, IngestionEventType.BlobCopy))
case Left(failure: Failure) =>
postgresClient.insertEvent(
IngestionEvent(ingestionMetaData, IngestionEventType.BlobCopy, EventStatus.Failure, EventDetails.errorDetails(failure.msg))
)
}
result
} else {
postgresClient.insertEvent(IngestionEvent(ingestionMetaData, eventType = IngestionEventType.ManifestExists))
Right(())
}
}
val uriParents: List[UriParent] = UriParent.createPairwiseChain(context.parents)
val rootUri = uriParents.last.parent
for {
_ <- upload
fileSize = Files.size(path)
mediaType <- typeDetector.detectType(path)
extractors = if(fileSize == 0) { List.empty } else { mimeTypeMapper.getExtractorsFor(mediaType.toString) }
mimeType = MimeType(mediaType.toString)
intermediateResources = uriParents.collect { case p: UriParentPair => p }.map(p => Manifest.InsertDirectory(parentUri = p.parent, uri = p.child))
insertions = intermediateResources :+ Manifest.InsertBlob(context.file, blobUri, context.parentBlobs, mimeType, context.ingestion, context.languages.map(_.key), extractors, context.workspace)
_ <- manifest.insert(insertions, rootUri)
data = IngestionData(
context.file.creationTime.map(_.toMillis),
context.file.lastModifiedTime.map(_.toMillis),
Set(mimeType),
Set(context.file.uri),
context.parentBlobs,
context.ingestion,
context.workspace
)
_ = postgresClient.insertEvent(
IngestionEvent(ingestionMetaData, eventType = IngestionEventType.MimeTypeDetected, details = EventDetails.ingestionDataDetails(data, extractors))
)
// TODO once we get attempt everywhere we can remove the await
_ <- index.ingestDocument(blobUri, context.file.size, data, context.languages).awaitEither(2.minutes)
} yield {
Blob(blobUri, fileSize, Set(mimeType))
}
}