in src/dotnet/AspireWorker/AspireHost/AspireHostResourceWatcher.cs [16:176]
internal sealed class AspireHostResourceWatcher(
DashboardService.DashboardServiceClient client,
Metadata headers,
IRdConnectionWrapper connectionWrapper,
AspireHostModel hostModel,
ResiliencePipelineProvider<string> resiliencePipelineProvider,
ILogger logger,
Lifetime lifetime)
{
private readonly ResiliencePipeline _pipeline =
resiliencePipelineProvider.GetPipeline(nameof(AspireHostResourceWatcher));
internal async Task WatchResources()
{
try
{
logger.StartResourceWatching();
await Task.Delay(TimeSpan.FromSeconds(5), lifetime);
var retryCout = 1;
await _pipeline.ExecuteAsync(async token => await SendWatchResourcesRequest(retryCout++, token), lifetime);
logger.StopResourceWatching(lifetime.IsAlive);
}
catch (OperationCanceledException)
{
logger.ResourceWatchingWasCancelled();
}
}
private async Task SendWatchResourcesRequest(int retryCount, CancellationToken ct)
{
try
{
var request = new WatchResourcesRequest { IsReconnect = retryCount > 1 };
var response = client.WatchResources(request, headers: headers, cancellationToken: ct);
await foreach (var update in response.ResponseStream.ReadAllAsync(ct))
{
switch (update.KindCase)
{
case WatchResourcesUpdate.KindOneofCase.InitialData:
await HandleInitialData(update.InitialData, ct);
break;
case WatchResourcesUpdate.KindOneofCase.Changes:
await HandleChanges(update.Changes, ct);
break;
case WatchResourcesUpdate.KindOneofCase.None:
break;
default:
throw new ArgumentOutOfRangeException(update.KindCase.ToString());
}
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
logger.ResourceWatchingRequestWasCancelled();
}
}
private async Task HandleInitialData(InitialResourceData initialResourceData, CancellationToken ct)
{
logger.HandleInitialResourceData();
await connectionWrapper.ClearResources(hostModel);
foreach (var resource in initialResourceData.Resources)
{
ct.ThrowIfCancellationRequested();
var resourceModel = resource.ToModel();
var resourceWrapper = new ResourceWrapper();
resourceWrapper.ExecuteCommand.SetAsync(async (lt, request) => await ExecuteCommand(request, lt));
resourceWrapper.Model.SetValue(resourceModel);
await connectionWrapper.AddResource(hostModel, resourceModel.Name, resourceWrapper);
}
}
private async Task HandleChanges(WatchResourcesChanges watchResourcesChanges, CancellationToken ct)
{
logger.HandleResourceChanges();
foreach (var change in watchResourcesChanges.Value)
{
ct.ThrowIfCancellationRequested();
if (change is null) continue;
switch (change.KindCase)
{
case WatchResourcesChange.KindOneofCase.Upsert:
await UpsertResource(change);
break;
case WatchResourcesChange.KindOneofCase.Delete:
await DeleteResource(change);
break;
case WatchResourcesChange.KindOneofCase.None:
break;
default:
throw new ArgumentOutOfRangeException(change.KindCase.ToString());
}
}
}
private async Task UpsertResource(WatchResourcesChange change)
{
var resourceModel = change.Upsert.ToModel();
await connectionWrapper.UpsertResource(hostModel, resourceModel, rm =>
{
var resourceWrapper = new ResourceWrapper();
resourceWrapper.ExecuteCommand.SetAsync(async (lt, request) => await ExecuteCommand(request, lt));
resourceWrapper.Model.SetValue(rm);
return resourceWrapper;
});
}
private async Task DeleteResource(WatchResourcesChange change)
{
await connectionWrapper.RemoveResource(hostModel, change.Delete.ResourceName);
}
private async Task<ResourceCommandResponse> ExecuteCommand(ResourceCommandRequest command, Lifetime lt)
{
var request = MapRequest(command);
var response = await client.ExecuteResourceCommandAsync(request, headers: headers,
cancellationToken: lt.ToCancellationToken());
return MapResponse(response);
}
private static global::Aspire.DashboardService.Proto.V1.ResourceCommandRequest MapRequest(
ResourceCommandRequest request) =>
new()
{
CommandName = request.CommandName,
ResourceName = request.ResourceName,
ResourceType = request.ResourceType
};
private static ResourceCommandResponse MapResponse(
global::Aspire.DashboardService.Proto.V1.ResourceCommandResponse response) =>
new(
MapResponseKind(response.Kind),
response.HasErrorMessage ? response.ErrorMessage : null
);
private static ResourceCommandResponseKind MapResponseKind(
global::Aspire.DashboardService.Proto.V1.ResourceCommandResponseKind kind) => kind switch
{
global::Aspire.DashboardService.Proto.V1.ResourceCommandResponseKind.Undefined =>
ResourceCommandResponseKind.Undefined,
global::Aspire.DashboardService.Proto.V1.ResourceCommandResponseKind.Succeeded =>
ResourceCommandResponseKind.Succeeded,
global::Aspire.DashboardService.Proto.V1.ResourceCommandResponseKind.Failed =>
ResourceCommandResponseKind.Failed,
global::Aspire.DashboardService.Proto.V1.ResourceCommandResponseKind.Cancelled =>
ResourceCommandResponseKind.Canceled,
_ => throw new ArgumentOutOfRangeException(nameof(kind), kind, null)
};
}