in debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala [73:127]
def apply(
schema: Path,
data: InputStream,
debugger: Debugger,
infosetFormat: String,
variables: Map[String, String],
tunables: Map[String, String]
): IO[Parse] =
for {
dp <- Compiler()
.compile(schema, tunables)
.map(p =>
p.withDebugger(debugger)
.withDebugging(true)
.withExternalVariables(variables)
.withValidationMode(ValidationMode.Limited)
)
done <- Ref[IO].of(false)
pleaseStop <- Deferred[IO, Unit]
} yield new Parse {
def run(): Stream[IO, Byte] =
fs2.io
.readOutputStream(4096) { os =>
val stopper =
pleaseStop.get *> IO.canceled // will cancel the concurrent parse effect
val infosetOutputter = infosetFormat match {
case "xml" => new XMLTextInfosetOutputter(os, true)
case "json" => new JsonInfosetOutputter(os, true)
}
val parse =
IO.interruptibleMany(
dp.parse(
new InputSourceDataInputStream(data),
infosetOutputter
)
// WARNING: parse doesn't close the OutputStream, so closed below
).ensureOr(res => new Parse.Exception(res.getDiagnostics.toList))(res => !res.isError)
.guarantee(IO(os.close) *> done.set(true))
.void
stopper &> parse
}
def close(): IO[Unit] =
done.get.flatMap {
case false =>
Logger[IO].debug("interrupting parse") *> pleaseStop
.complete(())
.void
case true =>
Logger[IO].debug("parse done, no interruption") *> IO.unit
}
}