protected def decode()

in csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapJavaStage.scala [55:131]


  protected def decode(elem: ju.Collection[ByteString]): ju.List[String] =
    elem.stream().map[String](decodeByteString).collect(Collectors.toList())

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private[this] var headers = columnNames

      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit = {
            val elem = grab(in)
            if (combineAll) {
              process(elem, zipAllWithHeaders)
            } else {
              process(elem, zipWithHeaders)
            }
          }
        })

      private def process(elem: ju.Collection[ByteString], combine: ju.Collection[V] => ju.Map[String, V]) = {
        if (headers.isPresent) {
          val map = combine(transformElements(elem))
          push(out, map)
        } else {
          headers = ju.Optional.of(decode(elem))
          pull(in)
        }
      }

      setHandler(out,
        new OutHandler {
          override def onPull(): Unit = pull(in)
        })

      private def zipWithHeaders(elem: ju.Collection[V]): ju.Map[String, V] = {
        val map = new ju.HashMap[String, V]()
        val hIter = headers.get.iterator()
        val colIter = elem.iterator()
        while (hIter.hasNext && colIter.hasNext) {
          map.put(hIter.next(), colIter.next())
        }
        map
      }

      private def zipAllWithHeaders(elem: ju.Collection[V]): ju.Map[String, V] = {
        val map = new ju.HashMap[String, V]()
        val hIter = headers.get.iterator()
        val colIter = elem.iterator()
        if (headers.get.size() > elem.size()) {
          while (hIter.hasNext) {
            if (colIter.hasNext) {
              map.put(hIter.next(), colIter.next())
            } else {
              map.put(hIter.next(), customFieldValuePlaceholder.orElse(fieldValuePlaceholder))
            }
          }
        } else if (elem.size() > headers.get.size()) {
          var index = 0
          while (colIter.hasNext) {
            if (hIter.hasNext) {
              map.put(hIter.next(), colIter.next())
            } else {
              map.put(headerPlaceholder.orElse("MissingHeader") + index, colIter.next())
              index = index + 1
            }
          }

        } else {
          while (hIter.hasNext && colIter.hasNext) {
            map.put(hIter.next(), colIter.next())
          }
        }
        map
      }

    }