in src/TriggersBinding/MySqlTableChangeMonitor.cs [326:378]
private async Task ProcessTableChangesAsync(CancellationToken token)
{
if (this._rowsToProcess.Count > 0)
{
IReadOnlyList<MySqlChange<T>> changes = null;
try
{
changes = this.ProcessChanges();
}
catch (Exception e)
{
// Either there's a bug or we're in a bad state so not much we can do here. We'll try clearing
// our state and retry getting the changes from the top again in case something broke while
// fetching the changes.
// It doesn't make sense to retry processing the changes immediately since this isn't a connection-based issue.
// We could probably send up the changes we were able to process and just skip the ones we couldn't, but given
// that this is not a case we expect would happen during normal execution we'll err on the side of caution for
// now and just retry getting the whole set of changes.
this._logger.LogError($"Failed to compose trigger parameter value for the specified table due to exception: {e.GetType()}. Exception message: {e.Message}");
await this.ClearRowsAsync(token);
}
if (changes != null)
{
var input = new TriggeredFunctionData() { TriggerValue = changes };
// var stopwatch = Stopwatch.StartNew();
FunctionResult result = await this._executor.TryExecuteAsync(input, this._cancellationTokenSourceExecutor.Token);
// long durationMs = stopwatch.ElapsedMilliseconds;
if (result.Succeeded)
{
this._logger.LogInformation("Function Trigger executed successfully.");
await this._rowsLock.WaitAsync(token);
this._rowsToRelease = this._rowsToProcess;
this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();
this._rowsLock.Release();
}
else
{
this._logger.LogError($"Exception encountered while executing the Function Trigger. Exception: {result.Exception}");
}
this._state = State.Cleanup;
}
}
else
{
// This ideally should never happen, but as a safety measure ensure that if we tried to process changes but there weren't
// any we still ensure everything is reset to a clean state
await this.ClearRowsAsync(token);
}
}