def apply()

in debugger/src/main/scala/org.apache.daffodil.debugger.dap/Parse.scala [74:162]


  def apply(
      schema: Path,
      data: InputStream,
      dataFilePath: Path,
      debugger: Debugger,
      infosetFormat: String,
      rootName: Option[String],
      rootNamespace: Option[String],
      variables: Map[String, String],
      tunables: Map[String, String],
      dapEvents: Channel[IO, Events.DebugEvent]
  ): IO[Parse] =
    for {
      dp <- Compiler()
        .compile(schema, rootName, rootNamespace, tunables)
        .map(p =>
          p.withDebugger(debugger)
            .withDebugging(true)
            .withExternalVariables(variables)
            .withValidationMode(ValidationMode.Limited)
        )
      done <- Ref[IO].of(false)
      pleaseStop <- Deferred[IO, Unit]
    } yield new Parse {
      def run(): Stream[IO, Byte] =
        fs2.io
          .readOutputStream(4096) { os =>
            val stopper =
              pleaseStop.get *> IO.canceled // will cancel the concurrent parse effect

            val infosetOutputter = infosetFormat match {
              case "xml"  => new XMLTextInfosetOutputter(os, true)
              case "json" => new JsonInfosetOutputter(os, true)
            }

            val dataSize = data.available()

            val parse =
              IO.interruptibleMany(
                dp.parse(
                  new InputSourceDataInputStream(data),
                  infosetOutputter
                )
                // WARNING: parse doesn't close the OutputStream, so closed below
              ).flatTap { res =>
                if (res.isError) {
                  dapEvents
                    .send(
                      Parse.Event.Error(
                        res.getDiagnostics.toList
                          .map(d => d.toString)
                          .mkString("\n")
                      )
                    )
                    .void
                } else IO.unit
              }.ensureOr { res =>
                new Parse.Exception(res.getDiagnostics.toList)
              }(res => !res.isError)
                .flatMap { parseResult =>
                  val loc = parseResult.location()
                  val leftOverBits = (dataSize - (loc.bytePos1b - 1)) * 8

                  if (leftOverBits > 0) {
                    val message = DataLeftOverUtils.getMessage(dataFilePath, loc.bitPos1b, loc.bytePos1b, leftOverBits)

                    Logger[IO].error(message) *> dapEvents.send(
                      DataLeftOverEvent(loc.bitPos1b, loc.bytePos1b, leftOverBits, message)
                    ) *> IO.unit
                  } else {
                    IO.unit
                  }
                }
                .void
                .guarantee(IO(os.close) *> done.set(true))

            stopper &> parse
          }

      def close(): IO[Unit] =
        done.get.flatMap {
          case false =>
            Logger[IO].debug("interrupting parse") *> pleaseStop
              .complete(())
              .void
          case true =>
            Logger[IO].debug("parse done, no interruption") *> IO.unit
        }
    }