in src/common/Kubernetes/KubernetesWatcher.cs [138:228]
public Task WatchNamespacesAsync(
Action<WatchEventType, V1ObjectMeta> callback,
CancellationToken cancellationToken)
=> WatchAsync($"/api/v1/namespaces", callback, cancellationToken);
/// <summary>
/// <see cref="IKubernetesWatcher.WatchServicesAsync"/>
/// </summary>
public Task WatchServicesAsync(
Action<WatchEventType, V1ObjectMeta> callback,
CancellationToken cancellationToken)
=> WatchAsync($"api/v1/services", callback, cancellationToken);
/// <summary>
/// <see cref="IKubernetesWatcher.WatchIngressesAsync"/>
/// </summary>
public Task WatchIngressesAsync(
string namespaceName,
Action<WatchEventType, V1ObjectMeta> callback,
CancellationToken cancellationToken)
=> WatchAsync($"/apis/networking.k8s.io/v1/namespaces/{namespaceName}/ingresses", callback, cancellationToken);
/// <summary>
/// <see cref="IKubernetesWatcher.WatchPodsAsync"/>
/// </summary>
public Task WatchPodsAsync(
string namespaceName,
Action<WatchEventType, V1ObjectMeta> callback,
CancellationToken cancellationToken)
=> WatchAsync($"/api/v1/namespaces/{namespaceName}/pods", callback, cancellationToken);
private Task WatchAsync(
string path,
Action<WatchEventType, V1ObjectMeta> callback,
CancellationToken cancellationToken)
{
if (this._cts == null)
{
throw new ObjectDisposedException(nameof(KubernetesWatcher));
}
var instanceToken = this._cts.Token;
var requestUri = new Uri(this._baseUri, $"{path}?watch=true");
HttpResponseMessage response = null;
return Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested && !instanceToken.IsCancellationRequested)
{
using (var myCts = new CancellationTokenSource())
using (instanceToken.Register(() => myCts.Cancel()))
using (cancellationToken.Register(() => myCts.Cancel()))
{
bool backoff = false;
try
{
if (response == null)
{
response = await StartAsync(requestUri, myCts.Token);
Interlocked.Exchange(ref this._failureBackoff, 0); // reset backoff
}
await ProcessAsync(response, callback, myCts.Token);
}
catch (HttpRequestException ex)
{
// User cluster cannot be reached
_log.ExceptionAsWarning(ex);
backoff = true;
}
catch (Exception ex)
{
// Write exception for diagnostic purposes
_log.Exception(ex);
backoff = true;
}
response = null;
if (backoff)
{
// Wait for a period of time before trying again
int waitMs = Math.Min(300 * Interlocked.Increment(ref this._failureBackoff), 10000);
await Task.Delay(waitMs, myCts.Token);
}
myCts.Cancel();
}
}
});
}