override def createLogic()

in file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala [41:289]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with StageLogging {
      import TarReaderStage._

      private final class FileOutSubSource
          extends SubSourceOutlet[ByteString]("fileOut")
          with TarReaderStage.SourceWithTimeout

      setHandlers(flowIn, flowOut, new CollectHeader(ByteString.empty))

      def readHeader(buffer: ByteString): Unit = {
        if (buffer.length >= TarArchiveEntry.headerLength) {
          readFile(buffer)
        } else {
          tryPullIfNeeded()
          setHandlers(flowIn, flowOut, new CollectHeader(buffer))
        }
      }

      def readFile(headerBuffer: ByteString): Unit = {
        def pushSource(metadata: TarArchiveMetadata, buffer: ByteString): Unit = {
          if (buffer.length >= metadata.size) {
            val (emit, remain) = buffer.splitAt(metadata.size.toInt)
            log.debug("emitting completed source for [{}]", metadata)
            push(flowOut, metadata -> Source.single(emit))
            readTrailer(metadata, remain, subSource = None)
          } else setHandlers(flowIn, flowOut, new CollectFile(metadata, buffer))
        }

        if (headerBuffer.head == 0) {
          log.debug("empty filename, detected EOF padding, completing")
          complete(flowOut)
          setHandlers(flowIn, flowOut, new FlushEndOfFilePadding())
        } else {
          val metadata = TarArchiveEntry.parse(headerBuffer)
          val buffer = headerBuffer.drop(TarArchiveEntry.headerLength)
          if (isAvailable(flowOut)) {
            pushSource(metadata, buffer)
          } else {
            setHandlers(flowIn, flowOut, new PushSourceOnPull(metadata, buffer))
          }
        }

        final class PushSourceOnPull(metadata: TarArchiveMetadata, buffer: ByteString)
            extends OutHandler
            with InHandler {
          override def onPull(): Unit = {
            setHandler(flowOut, IgnoreDownstreamPull)
            pushSource(metadata, buffer)
          }

          // fail on upstream push
          override def onPush(): Unit = failStage(new TarReaderException("upstream pushed unexpectedly"))
          override def onUpstreamFinish(): Unit = setKeepGoing(true)
        }
      }

      def readTrailer(metadata: TarArchiveMetadata,
          buffer: ByteString,
          subSource: Option[SubSourceOutlet[ByteString]]): Unit = {
        val trailerLength = TarArchiveEntry.trailerLength(metadata)
        if (buffer.length >= trailerLength) {
          subSource.foreach(_.complete())
          if (isClosed(flowIn)) completeStage()
          readHeader(buffer.drop(trailerLength))
        } else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata, buffer, subSource))
      }

      override protected def onTimer(timerKey: Any): Unit = {
        timerKey match {
          case SubscriptionTimeout(subSource) =>
            import StreamSubscriptionTimeoutTerminationMode._

            val timeoutSettings = attributes
              .get[ActorAttributes.StreamSubscriptionTimeout]
              .getOrElse(StreamSubscriptionTimeout(FiniteDuration(1, TimeUnit.MILLISECONDS), NoopTermination))
            val timeout = timeoutSettings.timeout

            timeoutSettings.mode match {
              case CancelTermination =>
                subSource.timeout(timeout)
                failStage(
                  new TarReaderException(
                    s"The tar content source was not subscribed to within $timeout, it must be subscribed to to progress tar file reading."))
              case WarnTermination =>
                log.warning(
                  "The tar content source was not subscribed to within {}, it must be subscribed to to progress tar file reading.",
                  timeout.toCoarsest)
              case NoopTermination =>
            }
          case other =>
            log.warning("unexpected timer [{}]", other)
        }
      }

      private def tryPullIfNeeded(): Unit = {
        if (!hasBeenPulled(flowIn))
          tryPull(flowIn)
      }

      /**
       * Don't react on downstream pulls until we have something to push.
       */
      private trait IgnoreDownstreamPull extends OutHandler {
        final override def onPull(): Unit = ()
      }
      private object IgnoreDownstreamPull extends IgnoreDownstreamPull

      /**
       * Pull upstream on a downstream pull and ignore subsequent pulls.
       */
      private trait ExpectDownstreamPull extends OutHandler {
        final override def onPull(): Unit = {
          tryPullIfNeeded()
          setHandler(flowOut, IgnoreDownstreamPull)
        }
      }
      private object ExpectDownstreamPull extends ExpectDownstreamPull

      /**
       * Handler until the header of 512 bytes is completely received.
       */
      private final class CollectHeader(var buffer: ByteString) extends InHandler with ExpectDownstreamPull {

        override def onPush(): Unit = {
          buffer ++= grab(flowIn)
          if (buffer.length >= TarArchiveEntry.headerLength) {
            readFile(buffer)
          } else {
            tryPullIfNeeded()
          }
        }

        override def onUpstreamFinish(): Unit = {
          if (buffer.isEmpty) completeStage()
          else
            failStage(
              new TarReaderException(
                s"incomplete tar header: received ${buffer.length} bytes, expected ${TarArchiveEntry.headerLength} bytes"))
        }
      }

      /**
       * Handler during file content reading.
       */
      private final class CollectFile(metadata: TarArchiveMetadata, var buffer: ByteString)
          extends InHandler
          with IgnoreDownstreamPull {
        private var emitted: Long = 0

        private val subSource: FileOutSubSource = {
          val sub = new FileOutSubSource()
          val timeoutSignal = SubscriptionTimeout(sub)
          sub.setHandler(new OutHandler {
            override def onPull(): Unit = {
              cancelTimer(timeoutSignal)
              if (buffer.nonEmpty) {
                subPush(buffer)
                buffer = ByteString.empty
                if (isClosed(flowIn)) onUpstreamFinish()
              } else {
                tryPullIfNeeded()
              }
            }
          })
          val timeout = attributes.get[ActorAttributes.StreamSubscriptionTimeout].get.timeout
          scheduleOnce(timeoutSignal, timeout)
          sub
        }

        log.debug("emitting source for [{}]", metadata)
        push(flowOut, metadata -> Source.fromGraph(subSource.source))
        setHandler(flowOut, IgnoreDownstreamPull)

        private def subPush(bs: ByteString): Unit = {
          val remaining = metadata.size - emitted
          if (remaining <= bs.length) {
            val (emit, remain) = bs.splitAt(remaining.toInt)
            subSource.push(emit)
            readTrailer(metadata, remain, Some(subSource))
          } else {
            subSource.push(bs)
            emitted += bs.length
          }
        }

        override def onPush(): Unit = {
          subPush(grab(flowIn))
        }

        override def onUpstreamFinish(): Unit = {
          if (buffer.isEmpty) {
            failStage(
              new TarReaderException(
                s"incomplete tar file contents for [${metadata.filePath}] expected ${metadata.size} bytes, received $emitted bytes"))
          } else setKeepGoing(true)
        }

      }

      /**
       * Handler to read past the padding trailer.
       */
      private final class ReadPastTrailer(metadata: TarArchiveMetadata,
          var buffer: ByteString,
          subSource: Option[SubSourceOutlet[ByteString]])
          extends InHandler
          with ExpectDownstreamPull {
        private val trailerLength = TarArchiveEntry.trailerLength(metadata)

        override def onPush(): Unit = {
          // TODO the buffer content doesn't need to be kept
          buffer ++= grab(flowIn)
          if (buffer.length >= trailerLength) {
            subSource.foreach { src =>
              src.complete()
              setHandler(flowOut, ExpectDownstreamPull)
              if (isAvailable(flowOut))
                tryPullIfNeeded()
            }
            readHeader(buffer.drop(trailerLength))
          } else {
            tryPullIfNeeded()
          }
        }

        override def onUpstreamFinish(): Unit = {
          if (buffer.length == trailerLength) completeStage()
          else
            failStage(
              new TarReaderException(
                s"incomplete tar file trailer for [${metadata.filePath}] expected ${trailerLength} bytes, received ${buffer.length} bytes"))
        }
      }

      /**
       * "At the end of the archive file there are two 512-byte blocks filled with binary zeros as an end-of-file marker."
       */
      private final class FlushEndOfFilePadding() extends InHandler with IgnoreDownstreamPull {

        override def onPush(): Unit = {
          grab(flowIn)
          tryPullIfNeeded()
        }

        tryPullIfNeeded()
      }

    }