private def nioEventLoop()

in unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala [82:244]


  private def nioEventLoop(sel: Selector, log: LoggingAdapter)(implicit ec: ExecutionContext): Unit =
    while (sel.isOpen) {
      val nrOfKeysSelected = sel.select()
      if (sel.isOpen) {
        val keySelectable = nrOfKeysSelected > 0
        val keys = if (keySelectable) sel.selectedKeys().iterator() else sel.keys().iterator()
        while (keys.hasNext) {
          val key = keys.next()

          if (key != null) { // Observed as sometimes being null via sel.keys().iterator()
            if (log.isDebugEnabled) {
              val interestInfo = if (keySelectable) {
                val interestSet = key.asInstanceOf[SelectionKey].interestOps()

                val isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) != 0
                val isInterestedInConnect = (interestSet & SelectionKey.OP_CONNECT) != 0
                val isInterestedInRead = (interestSet & SelectionKey.OP_READ) != 0
                val isInterestedInWrite = (interestSet & SelectionKey.OP_WRITE) != 0

                f"(accept=$isInterestedInAccept%5s connect=$isInterestedInConnect%5s read=$isInterestedInRead%5s write=$isInterestedInWrite%5s)"
              } else {
                ""
              }

              log.debug(
                f"""ch=${key.channel().hashCode()}%10d
                   | at=${Option(key.attachment()).fold(0)(_.hashCode())}%10d
                   | selectable=$keySelectable%5s
                   | acceptable=${key.isAcceptable}%5s
                   | connectable=${key.isConnectable}%5s
                   | readable=${key.isReadable}%5s
                   | writable=${key.isWritable}%5s
                   | $interestInfo""".stripMargin.replaceAll("\n", ""))
            }

            if (keySelectable && (key.isAcceptable || key.isConnectable)) {
              val newConnectionOp = key.attachment().asInstanceOf[(Selector, SelectionKey) => Unit]
              newConnectionOp(sel, key)
            }
            key.attachment match {
              case null =>
              case sendReceiveContext: SendReceiveContext =>
                sendReceiveContext.send match {
                  case SendRequested(buffer, sent) if keySelectable && key.isWritable =>
                    val channel = key.channel().asInstanceOf[UnixSocketChannel]

                    val written =
                      try {
                        channel.write(buffer)
                      } catch {
                        case e: IOException =>
                          key.cancel()
                          try {
                            key.channel.close()
                          } catch { case _: IOException => }
                          sent.failure(e)
                          -1
                      }

                    val remaining = buffer.remaining

                    log.debug("written: {} remaining: {}", written, remaining)

                    if (written >= 0 && remaining == 0) {
                      sendReceiveContext.send = SendAvailable(buffer)
                      key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE)
                      sent.success(Done)
                    }
                  case _: SendRequested =>
                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE)
                  case _: SendAvailable =>
                  case ShutdownRequested if key.isValid && !sendReceiveContext.isOutputShutdown =>
                    try {
                      if (sendReceiveContext.isInputShutdown) {
                        log.debug("Write-side is shutting down")
                        key.cancel()
                        key.channel.close()
                      } else {
                        log.debug("Write-side is shutting down further output")
                        sendReceiveContext.isOutputShutdown = true
                        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE)
                        key.channel.asInstanceOf[UnixSocketChannel].shutdownOutput()
                      }
                    } catch {
                      // socket could have been closed in the meantime, so shutdownOutput will throw this
                      case _: IOException =>
                    }
                  case ShutdownRequested =>
                  case CloseRequested =>
                    log.debug("Write-side is shutting down unconditionally")
                    key.cancel()
                    try {
                      key.channel.close()
                    } catch { case _: IOException => }
                }
                sendReceiveContext.receive match {
                  case ReceiveAvailable(queue, buffer) if keySelectable && key.isReadable =>
                    buffer.clear()

                    val channel = key.channel.asInstanceOf[UnixSocketChannel]

                    val read =
                      try {
                        channel.read(buffer)
                      } catch {
                        // socket could have been closed in the meantime, so read will throw this
                        case _: IOException => -1
                      }

                    log.debug("read: {}", read)

                    if (read >= 0) {
                      buffer.flip()
                      val pendingResult = queue.offer(ByteString(buffer))
                      pendingResult.onComplete(_ => sel.wakeup())
                      sendReceiveContext.receive = PendingReceiveAck(queue, buffer, pendingResult)
                      key.interestOps(key.interestOps() & ~SelectionKey.OP_READ)
                    } else {
                      queue.complete()
                      try {
                        if (!sendReceiveContext.halfClose || sendReceiveContext.isOutputShutdown) {
                          queue.watchCompletion().onComplete { _ =>
                            log.debug("Read-side is shutting down")
                            key.cancel()
                            try {
                              key.channel().close()
                            } catch { case _: IOException => }
                          }
                        } else {
                          log.debug("Read-side is shutting down further input")
                          sendReceiveContext.isInputShutdown = true
                          channel.shutdownInput()
                        }
                      } catch {
                        // socket could have been closed in the meantime, so shutdownInput will throw this
                        case _: IOException =>
                      }
                    }
                  case _: ReceiveAvailable =>
                  case PendingReceiveAck(receiveQueue, receiveBuffer, pendingResult) if pendingResult.isCompleted =>
                    pendingResult.value.get match {
                      case Success(QueueOfferResult.Enqueued) =>
                        key.interestOps(key.interestOps() | SelectionKey.OP_READ)
                        sendReceiveContext.receive = ReceiveAvailable(receiveQueue, receiveBuffer)
                      case e =>
                        log.debug("Read-side is shutting down due to {}", e)
                        receiveQueue.complete()
                        key.cancel()
                        try {
                          key.channel.close()
                        } catch { case _: IOException => }
                    }
                  case _: PendingReceiveAck =>
                }
              case _: ((Selector, SelectionKey) => Unit) @unchecked =>
              case other =>
                log.warning("unexpected receive: [{}]", other)
            }
          }
          if (keySelectable) keys.remove()
        }
      }
    }