in common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala [483:548]
private def readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
val attachmentName = attachmentUri.path.toString
val start = transid.started(
this,
LoggingMarkers.DATABASE_ATT_GET,
s"[ATT_GET] '$dbName' finding attachment '$attachmentName' of document '$doc'")
require(doc != null, "doc undefined")
require(doc.rev.rev != null, "doc revision must be specified")
val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))
def readStream(file: GridFSFile) = {
val source = MongoDBAsyncStreamSource(downloadStream)
source
.runWith(sink)
.map { result =>
transid
.finished(
this,
start,
s"[ATT_GET] '$collName' completed: found attachment '$attachmentName' of document '$doc'")
result
}
}
def getGridFSFile = {
downloadStream
.gridFSFile()
.head()
.transform(
identity, {
case ex: MongoGridFSException if ex.getMessage.contains("File not found") =>
transid.finished(
this,
start,
s"[ATT_GET] '$collName', retrieving attachment '$attachmentName' of document '$doc'; not found.")
NoDocumentException("Not found on 'readAttachment'.")
case ex: MongoGridFSException =>
transid.failed(
this,
start,
s"[ATT_GET] '$collName' failed to get attachment '$attachmentName' of document '$doc'; error code: '${ex.getCode}'",
ErrorLevel)
throw new Exception("Unexpected mongodb server error: " + ex.getMessage)
case t => t
})
}
val f = for {
file <- getGridFSFile
result <- readStream(file)
} yield result
reportFailure(
f,
failure =>
transid.failed(
this,
start,
s"[ATT_GET] '$dbName' internal error, name: '$attachmentName', doc: '$doc', failure: '${failure.getMessage}'",
ErrorLevel))
}