override def shape: FlowShape[ArchiveEntry, PathCacheEntry] = FlowShape.of()

in common/src/main/scala/com/theguardian/multimedia/archivehunter/common/cmn_helpers/PathCacheExtractor.scala [16:57]


  override def shape: FlowShape[ArchiveEntry, PathCacheEntry] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var buffer:Seq[PathCacheEntry] = Seq()
    private var awaitingCompletion = false

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = if(buffer.isEmpty) {
        if(awaitingCompletion) {
          completeStage()
        } else {
          pull(in)
        }
      } else {
        push(out, buffer.head)
        buffer = buffer.tail
      }
    })

    setHandler(in, new AbstractInHandler {
      override def onPush(): Unit = {
        val entry = grab(in)

        val parts = entry.path.split("/").init  //the last element is the filename, which we are not interested in.
        if(parts.isEmpty) {
          pull(in)  //if there is no path for this element then grab the next one
        } else {
          buffer = buffer ++ PathCacheExtractor.recursiveGenerateEntries(parts.init, parts.last, parts.length, entry.bucket)
          push(out, buffer.head)
          buffer = buffer.tail
        }
      }

      override def onUpstreamFinish(): Unit = {
        if(buffer.isEmpty) {
          completeStage()
        } else {
          awaitingCompletion = true
        }
      }
    })
  }