public fun OrchestrationServer()

in hot-reload-orchestration/src/main/kotlin/org/jetbrains/compose/reload/orchestration/OrchestrationServer.kt [70:173]


public fun OrchestrationServer(): OrchestrationServer {
    val bind = Future<Unit>()
    val port = Future<Int>()

    val start = Future<Unit>()
    val messages = Bus<OrchestrationPackage>()
    val states = OrchestrationServerStates()

    /**
     * Represents an actor who attempts to connect to clients listening at a given port.
     */
    val connectActor = Actor</* Port */ Int, Boolean>()

    val task = launchTask<Unit>("OrchestrationServer") {
        invokeOnFinish { bind.completeExceptionally(it.exceptionOrNull() ?: StoppedException()) }
        invokeOnFinish { port.completeExceptionally(it.exceptionOrNull() ?: StoppedException()) }

        val serverThread = WorkerThread("Orchestration Server")
        launchOnFinish { serverThread.shutdown().await() }

        subtask("Listen for incoming connections", serverThread.dispatcherImmediate) {
            val serverSocket = ServerSocket()
            invokeOnStop { serverSocket.close() }

            bind.await()
            serverSocket.bind(InetSocketAddress("127.0.0.1", 0))
            port.complete(serverSocket.localPort)

            start.await()
            while (isActive()) {
                val clientSocket = serverSocket.accept()
                clientSocket.setOrchestrationDefaults()

                val client = launchClient(OrchestrationIO(clientSocket), messages, states)
                invokeOnFinish { client.stop() }
            }
        }

        subtask("Connect to clients") {
            val serverPort = port.awaitOrThrow()

            connectActor.process { port ->
                logger.debug("Connecting to client on port '$port'")
                val reader = OrchestrationIO.newReaderThread()
                val writer = OrchestrationIO.newWriterThread()
                try {
                    withThread(writer) {
                        val socket = Socket("127.0.0.1", port)
                        socket.setOrchestrationDefaults()
                        val io = OrchestrationIO(socket, writer = writer, reader = reader)

                        io.writeInt(ORCHESTRATION_PROTOCOL_MAGIC_NUMBER)
                        io.writeInt(OrchestrationVersion.current.intValue)
                        io.writeInt(serverPort)

                        val client = launchClient(io, messages, states)
                        invokeOnFinish { client.stop() }
                        true
                    }
                } catch (t: Throwable) {
                    logger.error("Failed to connect to client on port '$port'", t)
                    reader.close()
                    writer.close()
                    false
                }
            }
        }
    }

    return object : OrchestrationServer, Task<Unit> by task {
        override val messages = messages.withType<OrchestrationMessage>()
        override val port: Future<Int> = port
        override val states = states

        override suspend fun connectClient(listenerPort: Int): Boolean {
            return connectActor(listenerPort)
        }

        override suspend fun send(message: OrchestrationMessage) {
            messages.send(message)
        }

        override suspend fun <T : OrchestrationState?> update(
            key: OrchestrationStateKey<T>, update: (T) -> T
        ): Update<T> {
            return states.update(key, update)
        }

        /* Server will always be able to update the state, as owner */
        override suspend fun <T : OrchestrationState?> tryUpdate(
            key: OrchestrationStateKey<T>, update: (T) -> T
        ): Update<T> = update(key, update)

        override suspend fun bind() {
            bind.complete(Unit)
            port.await()
        }

        override suspend fun start() {
            bind()
            start.complete(Unit)
        }
    }
}