in src/TriggerBinding/SqlTableChangeMonitor.cs [391:454]
private async Task ProcessTableChangesAsync(CancellationToken token)
{
if (this._rowsToProcess.Count > 0)
{
IReadOnlyList<SqlChange<T>> changes = null;
try
{
changes = await this.ProcessChanges(token);
}
catch (Exception e)
{
// Either there's a bug or we're in a bad state so not much we can do here. We'll try clearing
// our state and retry getting the changes from the top again in case something broke while
// fetching the changes.
// It doesn't make sense to retry processing the changes immediately since this isn't a connection-based issue.
// We could probably send up the changes we were able to process and just skip the ones we couldn't, but given
// that this is not a case we expect would happen during normal execution we'll err on the side of caution for
// now and just retry getting the whole set of changes.
this._logger.LogError($"Failed to compose trigger parameter value for table: '{this._userTable.FullName} due to exception: {e.GetType()}. Exception message: {e.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.ProcessChanges, e, this._telemetryProps);
await this.ClearRowsAsync(token);
}
if (changes != null)
{
var input = new TriggeredFunctionData() { TriggerValue = changes };
var stopwatch = Stopwatch.StartNew();
FunctionResult result = await this._executor.TryExecuteAsync(input, this._cancellationTokenSourceExecutor.Token);
long durationMs = stopwatch.ElapsedMilliseconds;
var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.DurationMs] = durationMs,
[TelemetryMeasureName.BatchCount] = this._rowsToProcess.Count,
};
// In the future might make sense to retry executing the function, but for now we just let
// another worker try.
if (result.Succeeded)
{
// We've successfully fully processed these so set them to be released in the cleanup phase
await this._rowsToProcessLock.WaitAsync(token);
this._rowsToRelease = this._rowsToProcess;
this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
this._rowsToProcessLock.Release();
}
TelemetryInstance.TrackEvent(
TelemetryEventName.TriggerFunction,
new Dictionary<TelemetryPropertyName, string>(this._telemetryProps) {
{ TelemetryPropertyName.Succeeded, result.Succeeded.ToString() },
},
measures);
this._state = State.Cleanup;
}
}
else
{
// This ideally should never happen, but as a safety measure ensure that if we tried to process changes but there weren't
// any we still ensure everything is reset to a clean state
await this.ClearRowsAsync(token);
}
}