in unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala [293:357]
private def sendReceiveStructures(sel: Selector, receiveBufferSize: Int, sendBufferSize: Int, halfClose: Boolean)(
implicit mat: Materializer,
ec: ExecutionContext): (SendReceiveContext, Flow[ByteString, ByteString, NotUsed]) = {
val (receiveQueue, receiveSource) =
Source
.queue[ByteString](2, OverflowStrategy.backpressure)
.prefixAndTail(0)
.map(_._2)
.toMat(Sink.head)(Keep.both)
.run()
val sendReceiveContext =
new SendReceiveContext(
SendAvailable(ByteBuffer.allocate(sendBufferSize)),
ReceiveAvailable(receiveQueue, ByteBuffer.allocate(receiveBufferSize)),
halfClose = halfClose,
isOutputShutdown = false,
isInputShutdown = false) // FIXME: No need for the costly allocation of direct buffers yet given https://github.com/jnr/jnr-unixsocket/pull/49
val sendSink = Sink.fromGraph(
Flow[ByteString]
.mapConcat { bytes =>
if (bytes.size <= sendBufferSize) {
Vector(bytes)
} else {
@annotation.tailrec
def splitToBufferSize(bytes: ByteString, acc: Vector[ByteString]): Vector[ByteString] =
if (bytes.nonEmpty) {
val (left, right) = bytes.splitAt(sendBufferSize)
splitToBufferSize(right, acc :+ left)
} else {
acc
}
splitToBufferSize(bytes, Vector.empty)
}
}
.mapAsync(1) { bytes =>
// Note - it is an error to get here and not have an AvailableSendContext
val sent = Promise[Done]()
val sendBuffer = sendReceiveContext.send.buffer
sendBuffer.clear()
val copied = bytes.copyToBuffer(sendBuffer)
sendBuffer.flip()
require(copied == bytes.size) // It is an error to exceed our buffer size given the above mapConcat
sendReceiveContext.send = SendRequested(sendBuffer, sent)
sel.wakeup()
sent.future.map(_ => bytes)
}
.watchTermination() {
case (_, done) =>
done.onComplete { _ =>
sendReceiveContext.send = if (halfClose) {
ShutdownRequested
} else {
receiveQueue.complete()
CloseRequested
}
sel.wakeup()
}
Keep.left
}
.to(Sink.ignore))
(sendReceiveContext, Flow.fromSinkAndSource(sendSink, Source.futureSource(receiveSource)))
}