public async Task ProcessAsync()

in converter/dicom-cast/src/Microsoft.Health.DicomCast.Core/Features/Worker/ChangeFeedProcessor.cs [56:136]


        public async Task ProcessAsync(TimeSpan pollIntervalDuringCatchup, CancellationToken cancellationToken)
        {
            SyncState state = await _syncStateService.GetSyncStateAsync(cancellationToken);

            while (true)
            {
                // Retrieve the change feed for any changes.
                IReadOnlyList<ChangeFeedEntry> changeFeedEntries = await _changeFeedRetrieveService.RetrieveChangeFeedAsync(
                    state.SyncedSequence,
                    cancellationToken);

                if (!changeFeedEntries.Any())
                {
                    _logger.LogInformation("No new DICOM events to process.");

                    return;
                }

                long maxSequence = changeFeedEntries[^1].Sequence;

                // Process each change feed as a FHIR transaction.
                foreach (ChangeFeedEntry changeFeedEntry in changeFeedEntries)
                {
                    try
                    {
                        if (!(changeFeedEntry.Action == ChangeFeedAction.Create && changeFeedEntry.State == ChangeFeedState.Deleted))
                        {
                            await _fhirTransactionPipeline.ProcessAsync(changeFeedEntry, cancellationToken);
                            _logger.LogInformation("Successfully processed DICOM event with SequenceID: {SequenceId}", changeFeedEntry.Sequence);
                        }
                        else
                        {
                            _logger.LogInformation("Skip DICOM event with SequenceId {SequenceId} due to deletion before processing creation.", changeFeedEntry.Sequence);
                        }
                    }
                    catch (Exception ex)
                    {
                        if (ex is FhirNonRetryableException || ex is DicomTagException || ex is TimeoutRejectedException)
                        {
                            string studyInstanceUid = changeFeedEntry.StudyInstanceUid;
                            string seriesInstanceUid = changeFeedEntry.SeriesInstanceUid;
                            string sopInstanceUid = changeFeedEntry.SopInstanceUid;
                            long changeFeedSequence = changeFeedEntry.Sequence;

                            ErrorType errorType = ErrorType.FhirError;

                            if (ex is DicomTagException)
                            {
                                errorType = ErrorType.DicomError;
                            }
                            else if (ex is TimeoutRejectedException)
                            {
                                errorType = ErrorType.TransientFailure;
                            }

                            await _exceptionStore.WriteExceptionAsync(
                                changeFeedEntry,
                                ex,
                                errorType,
                                cancellationToken);

                            _logger.LogError("Failed to process DICOM event with SequenceID: {SequenceId}, StudyUid: {StudyInstanceUid}, SeriesUid: {SeriesInstanceUid}, instanceUid: {SopInstanceUid}  and will not be retried further. Continuing to next event.", changeFeedEntry.Sequence, studyInstanceUid, seriesInstanceUid, sopInstanceUid);
                        }
                        else
                        {
                            throw;
                        }
                    }
                }

                var newSyncState = new SyncState(maxSequence, Clock.UtcNow);

                await _syncStateService.UpdateSyncStateAsync(newSyncState, cancellationToken);

                _logger.LogInformation("Processed DICOM events sequenced {SequenceId}-{MaxSequence}.", state.SyncedSequence + 1, maxSequence);

                state = newSyncState;

                await Task.Delay(pollIntervalDuringCatchup, cancellationToken);
            }
        }