private def readAttachmentFromMongo[T]()

in common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala [483:548]


  private def readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri, sink: Sink[ByteString, Future[T]])(
    implicit transid: TransactionId): Future[T] = {

    val attachmentName = attachmentUri.path.toString
    val start = transid.started(
      this,
      LoggingMarkers.DATABASE_ATT_GET,
      s"[ATT_GET] '$dbName' finding attachment '$attachmentName' of document '$doc'")

    require(doc != null, "doc undefined")
    require(doc.rev.rev != null, "doc revision must be specified")

    val downloadStream = gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))

    def readStream(file: GridFSFile) = {
      val source = MongoDBAsyncStreamSource(downloadStream)
      source
        .runWith(sink)
        .map { result =>
          transid
            .finished(
              this,
              start,
              s"[ATT_GET] '$collName' completed: found attachment '$attachmentName' of document '$doc'")
          result
        }
    }

    def getGridFSFile = {
      downloadStream
        .gridFSFile()
        .head()
        .transform(
          identity, {
            case ex: MongoGridFSException if ex.getMessage.contains("File not found") =>
              transid.finished(
                this,
                start,
                s"[ATT_GET] '$collName', retrieving attachment '$attachmentName' of document '$doc'; not found.")
              NoDocumentException("Not found on 'readAttachment'.")
            case ex: MongoGridFSException =>
              transid.failed(
                this,
                start,
                s"[ATT_GET] '$collName' failed to get attachment '$attachmentName' of document '$doc'; error code: '${ex.getCode}'",
                ErrorLevel)
              throw new Exception("Unexpected mongodb server error: " + ex.getMessage)
            case t => t
          })
    }

    val f = for {
      file <- getGridFSFile
      result <- readStream(file)
    } yield result

    reportFailure(
      f,
      failure =>
        transid.failed(
          this,
          start,
          s"[ATT_GET] '$dbName' internal error, name: '$attachmentName', doc: '$doc', failure: '${failure.getMessage}'",
          ErrorLevel))

  }