private def sendReceiveStructures()

in unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala [293:357]


  private def sendReceiveStructures(sel: Selector, receiveBufferSize: Int, sendBufferSize: Int, halfClose: Boolean)(
      implicit mat: Materializer,
      ec: ExecutionContext): (SendReceiveContext, Flow[ByteString, ByteString, NotUsed]) = {

    val (receiveQueue, receiveSource) =
      Source
        .queue[ByteString](2, OverflowStrategy.backpressure)
        .prefixAndTail(0)
        .map(_._2)
        .toMat(Sink.head)(Keep.both)
        .run()
    val sendReceiveContext =
      new SendReceiveContext(
        SendAvailable(ByteBuffer.allocate(sendBufferSize)),
        ReceiveAvailable(receiveQueue, ByteBuffer.allocate(receiveBufferSize)),
        halfClose = halfClose,
        isOutputShutdown = false,
        isInputShutdown = false) // FIXME: No need for the costly allocation of direct buffers yet given https://github.com/jnr/jnr-unixsocket/pull/49

    val sendSink = Sink.fromGraph(
      Flow[ByteString]
        .mapConcat { bytes =>
          if (bytes.size <= sendBufferSize) {
            Vector(bytes)
          } else {
            @annotation.tailrec
            def splitToBufferSize(bytes: ByteString, acc: Vector[ByteString]): Vector[ByteString] =
              if (bytes.nonEmpty) {
                val (left, right) = bytes.splitAt(sendBufferSize)
                splitToBufferSize(right, acc :+ left)
              } else {
                acc
              }
            splitToBufferSize(bytes, Vector.empty)
          }
        }
        .mapAsync(1) { bytes =>
          // Note - it is an error to get here and not have an AvailableSendContext
          val sent = Promise[Done]()
          val sendBuffer = sendReceiveContext.send.buffer
          sendBuffer.clear()
          val copied = bytes.copyToBuffer(sendBuffer)
          sendBuffer.flip()
          require(copied == bytes.size) // It is an error to exceed our buffer size given the above mapConcat
          sendReceiveContext.send = SendRequested(sendBuffer, sent)
          sel.wakeup()
          sent.future.map(_ => bytes)
        }
        .watchTermination() {
          case (_, done) =>
            done.onComplete { _ =>
              sendReceiveContext.send = if (halfClose) {
                ShutdownRequested
              } else {
                receiveQueue.complete()
                CloseRequested
              }
              sel.wakeup()
            }
            Keep.left
        }
        .to(Sink.ignore))

    (sendReceiveContext, Flow.fromSinkAndSource(sendSink, Source.futureSource(receiveSource)))
  }