in debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala [74:162]
def apply(
schema: Path,
data: InputStream,
dataFilePath: Path,
debugger: Debugger,
infosetFormat: String,
rootName: Option[String],
rootNamespace: Option[String],
variables: Map[String, String],
tunables: Map[String, String],
dapEvents: Channel[IO, Events.DebugEvent]
): IO[Parse] =
for {
dp <- Compiler()
.compile(schema, rootName, rootNamespace, tunables)
.map(p =>
p.withDebugger(debugger)
.withDebugging(true)
.withExternalVariables(variables)
.withValidationMode(ValidationMode.Limited)
)
done <- Ref[IO].of(false)
pleaseStop <- Deferred[IO, Unit]
} yield new Parse {
def run(): Stream[IO, Byte] =
fs2.io
.readOutputStream(4096) { os =>
val stopper =
pleaseStop.get *> IO.canceled // will cancel the concurrent parse effect
val infosetOutputter = infosetFormat match {
case "xml" => new XMLTextInfosetOutputter(os, true)
case "json" => new JsonInfosetOutputter(os, true)
}
val dataSize = data.available()
val parse =
IO.interruptibleMany(
dp.parse(
new InputSourceDataInputStream(data),
infosetOutputter
)
// WARNING: parse doesn't close the OutputStream, so closed below
).flatTap { res =>
if (res.isError) {
dapEvents
.send(
Parse.Event.Error(
res.getDiagnostics.toList
.map(d => d.toString)
.mkString("\n")
)
)
.void
} else IO.unit
}.ensureOr { res =>
new Parse.Exception(res.getDiagnostics.toList)
}(res => !res.isError)
.flatMap { parseResult =>
val loc = parseResult.location()
val leftOverBits = (dataSize - (loc.bytePos1b - 1)) * 8
if (leftOverBits > 0) {
val message = DataLeftOverUtils.getMessage(dataFilePath, loc.bitPos1b, loc.bytePos1b, leftOverBits)
Logger[IO].error(message) *> dapEvents.send(
DataLeftOverEvent(loc.bitPos1b, loc.bytePos1b, leftOverBits, message)
) *> IO.unit
} else {
IO.unit
}
}
.void
.guarantee(IO(os.close) *> done.set(true))
stopper &> parse
}
def close(): IO[Unit] =
done.get.flatMap {
case false =>
Logger[IO].debug("interrupting parse") *> pleaseStop
.complete(())
.void
case true =>
Logger[IO].debug("parse done, no interruption") *> IO.unit
}
}