in converter/dicom-cast/src/Microsoft.Health.DicomCast.Core/Features/Worker/ChangeFeedProcessor.cs [56:136]
public async Task ProcessAsync(TimeSpan pollIntervalDuringCatchup, CancellationToken cancellationToken)
{
SyncState state = await _syncStateService.GetSyncStateAsync(cancellationToken);
while (true)
{
// Retrieve the change feed for any changes.
IReadOnlyList<ChangeFeedEntry> changeFeedEntries = await _changeFeedRetrieveService.RetrieveChangeFeedAsync(
state.SyncedSequence,
cancellationToken);
if (!changeFeedEntries.Any())
{
_logger.LogInformation("No new DICOM events to process.");
return;
}
long maxSequence = changeFeedEntries[^1].Sequence;
// Process each change feed as a FHIR transaction.
foreach (ChangeFeedEntry changeFeedEntry in changeFeedEntries)
{
try
{
if (!(changeFeedEntry.Action == ChangeFeedAction.Create && changeFeedEntry.State == ChangeFeedState.Deleted))
{
await _fhirTransactionPipeline.ProcessAsync(changeFeedEntry, cancellationToken);
_logger.LogInformation("Successfully processed DICOM event with SequenceID: {SequenceId}", changeFeedEntry.Sequence);
}
else
{
_logger.LogInformation("Skip DICOM event with SequenceId {SequenceId} due to deletion before processing creation.", changeFeedEntry.Sequence);
}
}
catch (Exception ex)
{
if (ex is FhirNonRetryableException || ex is DicomTagException || ex is TimeoutRejectedException)
{
string studyInstanceUid = changeFeedEntry.StudyInstanceUid;
string seriesInstanceUid = changeFeedEntry.SeriesInstanceUid;
string sopInstanceUid = changeFeedEntry.SopInstanceUid;
long changeFeedSequence = changeFeedEntry.Sequence;
ErrorType errorType = ErrorType.FhirError;
if (ex is DicomTagException)
{
errorType = ErrorType.DicomError;
}
else if (ex is TimeoutRejectedException)
{
errorType = ErrorType.TransientFailure;
}
await _exceptionStore.WriteExceptionAsync(
changeFeedEntry,
ex,
errorType,
cancellationToken);
_logger.LogError("Failed to process DICOM event with SequenceID: {SequenceId}, StudyUid: {StudyInstanceUid}, SeriesUid: {SeriesInstanceUid}, instanceUid: {SopInstanceUid} and will not be retried further. Continuing to next event.", changeFeedEntry.Sequence, studyInstanceUid, seriesInstanceUid, sopInstanceUid);
}
else
{
throw;
}
}
}
var newSyncState = new SyncState(maxSequence, Clock.UtcNow);
await _syncStateService.UpdateSyncStateAsync(newSyncState, cancellationToken);
_logger.LogInformation("Processed DICOM events sequenced {SequenceId}-{MaxSequence}.", state.SyncedSequence + 1, maxSequence);
state = newSyncState;
await Task.Delay(pollIntervalDuringCatchup, cancellationToken);
}
}