public Task WatchNamespacesAsync()

in src/common/Kubernetes/KubernetesWatcher.cs [138:228]


        public Task WatchNamespacesAsync(
            Action<WatchEventType, V1ObjectMeta> callback,
            CancellationToken cancellationToken)
            => WatchAsync($"/api/v1/namespaces", callback, cancellationToken);

        /// <summary>
        /// <see cref="IKubernetesWatcher.WatchServicesAsync"/>
        /// </summary>
        public Task WatchServicesAsync(
            Action<WatchEventType, V1ObjectMeta> callback,
            CancellationToken cancellationToken)
            => WatchAsync($"api/v1/services", callback, cancellationToken);

        /// <summary>
        /// <see cref="IKubernetesWatcher.WatchIngressesAsync"/>
        /// </summary>
        public Task WatchIngressesAsync(
            string namespaceName,
            Action<WatchEventType, V1ObjectMeta> callback,
            CancellationToken cancellationToken)
            => WatchAsync($"/apis/networking.k8s.io/v1/namespaces/{namespaceName}/ingresses", callback, cancellationToken);

        /// <summary>
        /// <see cref="IKubernetesWatcher.WatchPodsAsync"/>
        /// </summary>
        public Task WatchPodsAsync(
            string namespaceName,
            Action<WatchEventType, V1ObjectMeta> callback,
            CancellationToken cancellationToken)
            => WatchAsync($"/api/v1/namespaces/{namespaceName}/pods", callback, cancellationToken);

        private Task WatchAsync(
            string path,
            Action<WatchEventType, V1ObjectMeta> callback,
            CancellationToken cancellationToken)
        {
            if (this._cts == null)
            {
                throw new ObjectDisposedException(nameof(KubernetesWatcher));
            }
            var instanceToken = this._cts.Token;

            var requestUri = new Uri(this._baseUri, $"{path}?watch=true");
            HttpResponseMessage response = null;

            return Task.Run(async () =>
            {
                while (!cancellationToken.IsCancellationRequested && !instanceToken.IsCancellationRequested)
                {
                    using (var myCts = new CancellationTokenSource())
                    using (instanceToken.Register(() => myCts.Cancel()))
                    using (cancellationToken.Register(() => myCts.Cancel()))
                    {
                        bool backoff = false;
                        try
                        {
                            if (response == null)
                            {
                                response = await StartAsync(requestUri, myCts.Token);
                                Interlocked.Exchange(ref this._failureBackoff, 0); // reset backoff
                            }

                            await ProcessAsync(response, callback, myCts.Token);
                        }
                        catch (HttpRequestException ex)
                        {
                            // User cluster cannot be reached
                            _log.ExceptionAsWarning(ex);
                            backoff = true;
                        }
                        catch (Exception ex)
                        {
                            // Write exception for diagnostic purposes
                            _log.Exception(ex);
                            backoff = true;
                        }

                        response = null;

                        if (backoff)
                        {
                            // Wait for a period of time before trying again
                            int waitMs = Math.Min(300 * Interlocked.Increment(ref this._failureBackoff), 10000);
                            await Task.Delay(waitMs, myCts.Token);
                        }

                        myCts.Cancel();
                    }
                }
            });
        }