in Sources/ServiceDiscovery/ServiceDiscovery+AsyncAwait.swift [35:68]
func subscribe(to service: Service) -> ServiceSnapshots<Instance> {
ServiceSnapshots(AsyncThrowingStream<[Instance], Error> { continuation in
Task {
let cancellationToken = self.subscribe(
to: service,
onNext: { result in
switch result {
case .success(let instances):
continuation.yield(instances)
case .failure(let error):
// LookupError is recoverable (e.g., service is added *after* subscription begins), so don't give up yet
guard error is LookupError else {
return continuation.finish(throwing: error)
}
}
},
onComplete: { reason in
switch reason {
case .cancellationRequested:
continuation.finish()
case .serviceDiscoveryUnavailable:
continuation.finish(throwing: ServiceDiscoveryError.unavailable)
default:
continuation.finish(throwing: ServiceDiscoveryError.other(reason.description))
}
}
)
continuation.onTermination = { @Sendable (_) -> Void in
cancellationToken.cancel()
}
}
})
}