in common/src/main/scala/com/theguardian/multimedia/archivehunter/common/cmn_helpers/PathCacheExtractor.scala [16:57]
override def shape: FlowShape[ArchiveEntry, PathCacheEntry] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var buffer:Seq[PathCacheEntry] = Seq()
private var awaitingCompletion = false
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = if(buffer.isEmpty) {
if(awaitingCompletion) {
completeStage()
} else {
pull(in)
}
} else {
push(out, buffer.head)
buffer = buffer.tail
}
})
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val entry = grab(in)
val parts = entry.path.split("/").init //the last element is the filename, which we are not interested in.
if(parts.isEmpty) {
pull(in) //if there is no path for this element then grab the next one
} else {
buffer = buffer ++ PathCacheExtractor.recursiveGenerateEntries(parts.init, parts.last, parts.length, entry.bucket)
push(out, buffer.head)
buffer = buffer.tail
}
}
override def onUpstreamFinish(): Unit = {
if(buffer.isEmpty) {
completeStage()
} else {
awaitingCompletion = true
}
}
})
}