def this()

in xml/src/main/scala/org/apache/pekko/stream/connectors/xml/impl/StreamingXmlWriter.scala [32:121]


  def this(charset: Charset) = this(charset, XMLOutputFactory.newInstance())

  val in: Inlet[ParseEvent] = Inlet("XMLWriter.in")
  val out: Outlet[ByteString] = Outlet("XMLWriter.out")
  override val shape: FlowShape[ParseEvent, ByteString] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      val byteStringBuilder = new ByteStringBuilder()

      val output = xmlOutputFactory.createXMLStreamWriter(byteStringBuilder.asOutputStream, charset.name())

      setHandlers(in, out, this)

      def writeAttributes(attributes: List[Attribute]): Unit =
        attributes.foreach { att =>
          att match {
            case Attribute(name, value, Some(prefix), Some(namespace)) =>
              output.writeAttribute(prefix, namespace, name, value)
            case Attribute(name, value, None, Some(namespace)) =>
              output.writeAttribute(namespace, name, value)
            case Attribute(name, value, Some(_), None) =>
              output.writeAttribute(name, value)
            case Attribute(name, value, None, None) =>
              output.writeAttribute(name, value)
          }

        }

      override def onPush(): Unit = {
        val ev: ParseEvent = grab(in)
        ev match {
          case StartDocument =>
            output.writeStartDocument()

          case EndDocument =>
            output.writeEndDocument()

          case StartElement(localName, attributes, optPrefix, Some(namespace), namespaceCtx) =>
            val prefix = optPrefix.getOrElse("")
            output.setPrefix(prefix, namespace)
            output.writeStartElement(prefix, localName, namespace)
            namespaceCtx.foreach(ns => output.writeNamespace(ns.prefix.getOrElse(""), ns.uri))
            writeAttributes(attributes)

          case StartElement(localName, attributes, Some(_), None, namespaceCtx) => // Shouldn't happened
            output.writeStartElement(localName)
            namespaceCtx.foreach(ns => output.writeNamespace(ns.prefix.getOrElse(""), ns.uri))
            writeAttributes(attributes)

          case StartElement(localName, attributes, None, None, namespaceCtx) =>
            output.writeStartElement(localName)
            namespaceCtx.foreach(ns => output.writeNamespace(ns.prefix.getOrElse(""), ns.uri))
            writeAttributes(attributes)

          case EndElement(_) =>
            output.writeEndElement()

          case Characters(text) =>
            output.writeCharacters(text)
          case ProcessingInstruction(Some(target), Some(data)) =>
            output.writeProcessingInstruction(target, data)

          case ProcessingInstruction(Some(target), None) =>
            output.writeProcessingInstruction(target)

          case ProcessingInstruction(None, Some(data)) =>
            output.writeProcessingInstruction(None.orNull, data)
          case ProcessingInstruction(None, None) =>
          case Comment(text) =>
            output.writeComment(text)

          case CData(text) =>
            output.writeCData(text)
        }
        push(out, byteStringBuilder.result().compact)
        byteStringBuilder.clear()
      }

      override def onPull(): Unit = pull(in)

      override def onUpstreamFinish(): Unit = {
        output.flush()
        val finalData = byteStringBuilder.result().compact
        if (finalData.length != 0) {
          emit(out, finalData)
        }
        super.onUpstreamFinish()
      }
    }