def apply()

in debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala [73:127]


  def apply(
      schema: Path,
      data: InputStream,
      debugger: Debugger,
      infosetFormat: String,
      variables: Map[String, String],
      tunables: Map[String, String]
  ): IO[Parse] =
    for {
      dp <- Compiler()
        .compile(schema, tunables)
        .map(p =>
          p.withDebugger(debugger)
            .withDebugging(true)
            .withExternalVariables(variables)
            .withValidationMode(ValidationMode.Limited)
        )
      done <- Ref[IO].of(false)
      pleaseStop <- Deferred[IO, Unit]
    } yield new Parse {
      def run(): Stream[IO, Byte] =
        fs2.io
          .readOutputStream(4096) { os =>
            val stopper =
              pleaseStop.get *> IO.canceled // will cancel the concurrent parse effect

            val infosetOutputter = infosetFormat match {
              case "xml"  => new XMLTextInfosetOutputter(os, true)
              case "json" => new JsonInfosetOutputter(os, true)
            }

            val parse =
              IO.interruptibleMany(
                dp.parse(
                  new InputSourceDataInputStream(data),
                  infosetOutputter
                )
                // WARNING: parse doesn't close the OutputStream, so closed below
              ).ensureOr(res => new Parse.Exception(res.getDiagnostics.toList))(res => !res.isError)
                .guarantee(IO(os.close) *> done.set(true))
                .void

            stopper &> parse
          }

      def close(): IO[Unit] =
        done.get.flatMap {
          case false =>
            Logger[IO].debug("interrupting parse") *> pleaseStop
              .complete(())
              .void
          case true =>
            Logger[IO].debug("parse done, no interruption") *> IO.unit
        }
    }