override def createLogicAndMaterializedValue()

in app/streamcomponents/ProjectSummarySink.scala [23:70]


  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[ProjectSummary]) = {
    val summaryPromise = Promise[ProjectSummary]()

    val logic = new GraphStageLogic(shape) {
      private val logger = LoggerFactory.getLogger(getClass)
      private var ongoingSummary = ProjectSummary()

      setHandler(in, new AbstractInHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          val maybeSizeFromAttribs = elem.fileAttribues.map(_.size)
          val maybeSizeFromMeta = elem.attributes.flatMap(_.longValues.get("DPSP_SIZE"))
          val maybeSizeFromStringMeta = elem.attributes.flatMap(_.stringValues.get("DPSP_SIZE").map(_.toLong))

          val itemSize = maybeSizeFromAttribs.getOrElse(maybeSizeFromMeta.getOrElse(maybeSizeFromStringMeta.getOrElse(0L)))

          val maybeXtn = elem.attributes
              .flatMap(_.stringValues.get("MXFS_FILEEXT"))
              .map(xtn=>if(xtn=="") "unknown" else xtn)

          val maybeMimeType = for {
            attrs <- elem.attributes
            typeString <- attrs.stringValues.get("MXFS_MIMETYPE")
            mimeType <- makeMimeType(typeString)
          } yield mimeType

          elem.attributes.flatMap(CustomMXSMetadata.fromMxsMetadata) match {
            case Some(customMeta)=>
              ongoingSummary = customMeta.itemType.map(t=>ongoingSummary.addGnmType(t.toString,itemSize)).getOrElse(ongoingSummary)
              ongoingSummary = customMeta.hidden.map(h=>ongoingSummary.addHiddenFile(h, itemSize)).getOrElse(ongoingSummary)
              ongoingSummary = customMeta.projectId.map(p=>ongoingSummary.addGnmProject(p, itemSize)).getOrElse(ongoingSummary)
              ongoingSummary = maybeXtn.map(x=>ongoingSummary.addFileType(x, itemSize)).getOrElse(ongoingSummary)
              ongoingSummary = maybeMimeType.map(x=>ongoingSummary.addMediaType(x, itemSize)).getOrElse(ongoingSummary)
              ongoingSummary = ongoingSummary.addToTotal(itemSize)
            case None=>
              logger.warn(s"Item ${elem.oid} has no custom metadata attributes")
          }
          pull(in)
        }
      })

      override def preStart(): Unit = pull(in)

      override def postStop(): Unit = summaryPromise.complete(Success(ongoingSummary))
    }

    (logic, summaryPromise.future)
  }