protected def bind()

in unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala [392:461]


  protected def bind(path: Path,
      backlog: Int = 128,
      halfClose: Boolean = false): Source[IncomingConnection, Future[ServerBinding]] = {

    val bind: () => Source[IncomingConnection, Future[ServerBinding]] = { () =>
      val (incomingConnectionQueue, incomingConnectionSource) =
        Source
          .queue[IncomingConnection](2, OverflowStrategy.backpressure)
          .prefixAndTail(0)
          .map {
            case (_, source) =>
              source
                .watchTermination() { (mat, done) =>
                  done
                    .andThen {
                      case _ =>
                        try {
                          Files.delete(path)
                        } catch {
                          case NonFatal(_) =>
                        }
                    }
                  mat
                }
          }
          .toMat(Sink.head)(Keep.both)
          .run()

      val serverBinding = Promise[ServerBinding]()

      val channel = UnixServerSocketChannel.open()
      channel.configureBlocking(false)
      val address = new JnrUnixSocketAddress(path.toFile)
      val registeredKey =
        channel.register(sel,
          SelectionKey.OP_ACCEPT,
          acceptKey(address, incomingConnectionQueue, halfClose, receiveBufferSize, sendBufferSize) _)
      try {
        channel.socket().bind(address, backlog)
        sel.wakeup()
        serverBinding.success(
          ServerBinding(UnixSocketAddress(Paths.get(address.path))) { () =>
            registeredKey.cancel()
            channel.close()
            incomingConnectionQueue.complete()
            incomingConnectionQueue.watchCompletion().map(_ => ())
          })
      } catch {
        case e: IOException =>
          val withAddress = new IOException(e.getMessage + s" ($address)", e)
          registeredKey.cancel()
          channel.close()
          incomingConnectionQueue.fail(withAddress)
          serverBinding.failure(withAddress)

        case NonFatal(e) =>
          registeredKey.cancel()
          channel.close()
          incomingConnectionQueue.fail(e)
          serverBinding.failure(e)
      }

      Source
        .futureSource(incomingConnectionSource)
        .mapMaterializedValue(_ => serverBinding.future)

    }

    Source.lazySource(bind).mapMaterializedValue(_.flatMap(identity))
  }