func subscribe()

in Sources/ServiceDiscovery/ServiceDiscovery+AsyncAwait.swift [35:68]


    func subscribe(to service: Service) -> ServiceSnapshots<Instance> {
        ServiceSnapshots(AsyncThrowingStream<[Instance], Error> { continuation in
            Task {
                let cancellationToken = self.subscribe(
                    to: service,
                    onNext: { result in
                        switch result {
                        case .success(let instances):
                            continuation.yield(instances)
                        case .failure(let error):
                            // LookupError is recoverable (e.g., service is added *after* subscription begins), so don't give up yet
                            guard error is LookupError else {
                                return continuation.finish(throwing: error)
                            }
                        }
                    },
                    onComplete: { reason in
                        switch reason {
                        case .cancellationRequested:
                            continuation.finish()
                        case .serviceDiscoveryUnavailable:
                            continuation.finish(throwing: ServiceDiscoveryError.unavailable)
                        default:
                            continuation.finish(throwing: ServiceDiscoveryError.other(reason.description))
                        }
                    }
                )

                continuation.onTermination = { @Sendable (_) -> Void in
                    cancellationToken.cancel()
                }
            }
        })
    }