in csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapJavaStage.scala [55:131]
protected def decode(elem: ju.Collection[ByteString]): ju.List[String] =
elem.stream().map[String](decodeByteString).collect(Collectors.toList())
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private[this] var headers = columnNames
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (combineAll) {
process(elem, zipAllWithHeaders)
} else {
process(elem, zipWithHeaders)
}
}
})
private def process(elem: ju.Collection[ByteString], combine: ju.Collection[V] => ju.Map[String, V]) = {
if (headers.isPresent) {
val map = combine(transformElements(elem))
push(out, map)
} else {
headers = ju.Optional.of(decode(elem))
pull(in)
}
}
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(in)
})
private def zipWithHeaders(elem: ju.Collection[V]): ju.Map[String, V] = {
val map = new ju.HashMap[String, V]()
val hIter = headers.get.iterator()
val colIter = elem.iterator()
while (hIter.hasNext && colIter.hasNext) {
map.put(hIter.next(), colIter.next())
}
map
}
private def zipAllWithHeaders(elem: ju.Collection[V]): ju.Map[String, V] = {
val map = new ju.HashMap[String, V]()
val hIter = headers.get.iterator()
val colIter = elem.iterator()
if (headers.get.size() > elem.size()) {
while (hIter.hasNext) {
if (colIter.hasNext) {
map.put(hIter.next(), colIter.next())
} else {
map.put(hIter.next(), customFieldValuePlaceholder.orElse(fieldValuePlaceholder))
}
}
} else if (elem.size() > headers.get.size()) {
var index = 0
while (colIter.hasNext) {
if (hIter.hasNext) {
map.put(hIter.next(), colIter.next())
} else {
map.put(headerPlaceholder.orElse("MissingHeader") + index, colIter.next())
index = index + 1
}
}
} else {
while (hIter.hasNext && colIter.hasNext) {
map.put(hIter.next(), colIter.next())
}
}
map
}
}