in src/TriggerBinding/SqlTableChangeMonitor.cs [106:171]
public SqlTableChangeMonitor(
string connectionString,
int userTableId,
SqlObject userTable,
string userFunctionId,
string bracketedLeasesTableName,
IReadOnlyList<string> userTableColumns,
IReadOnlyList<(string name, string type)> primaryKeyColumns,
ITriggeredFunctionExecutor executor,
SqlOptions sqlOptions,
ILogger logger,
IConfiguration configuration,
IDictionary<TelemetryPropertyName, string> telemetryProps)
{
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
this._userTable = !string.IsNullOrEmpty(userTable?.FullName) ? userTable : throw new ArgumentNullException(nameof(userTable));
this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId));
this._bracketedLeasesTableName = !string.IsNullOrEmpty(bracketedLeasesTableName) ? bracketedLeasesTableName : throw new ArgumentNullException(nameof(bracketedLeasesTableName));
this._userTableColumns = userTableColumns ?? throw new ArgumentNullException(nameof(userTableColumns));
this._primaryKeyColumns = primaryKeyColumns ?? throw new ArgumentNullException(nameof(primaryKeyColumns));
this._sqlOptions = sqlOptions ?? throw new ArgumentNullException(nameof(sqlOptions));
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._userTableId = userTableId;
this._telemetryProps = telemetryProps ?? new Dictionary<TelemetryPropertyName, string>();
// TODO: when we move to reading them exclusively from the host options, remove reading from settings.(https://github.com/Azure/azure-functions-sql-extension/issues/961)
// Check if there's config settings to override the default max batch size/polling interval values
int? configuredMaxBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_MaxBatchSize) ?? configuration.GetValue<int?>(ConfigKey_SqlTrigger_BatchSize);
int? configuredPollingInterval = configuration.GetValue<int?>(ConfigKey_SqlTrigger_PollingInterval);
this._maxBatchSize = configuredMaxBatchSize ?? this._sqlOptions.MaxBatchSize;
if (this._maxBatchSize <= 0)
{
throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxBatchSize}'. Ensure that the value is a positive integer.");
}
this._pollingIntervalInMs = configuredPollingInterval ?? this._sqlOptions.PollingIntervalMs;
if (this._pollingIntervalInMs <= 0)
{
throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_PollingInterval}'. Ensure that the value is a positive integer.");
}
TelemetryInstance.TrackEvent(
TelemetryEventName.TriggerMonitorStart,
new Dictionary<TelemetryPropertyName, string>(telemetryProps) {
{ TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredMaxBatchSize != null).ToString() },
{ TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() },
},
new Dictionary<TelemetryMeasureName, double>() {
{ TelemetryMeasureName.MaxBatchSize, this._maxBatchSize },
{ TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs }
}
);
// Prep search-conditions that will be used besides WHERE clause to match table rows.
this._rowMatchConditions = Enumerable.Range(0, this._maxBatchSize)
.Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.name.AsBracketQuotedString()} = @{rowIndex}_{colIndex}")))
.ToList();
#pragma warning disable CS4014 // Queue the below tasks and exit. Do not wait for their completion.
_ = Task.Run(() =>
{
this.RunChangeConsumptionLoopAsync();
this.RunLeaseRenewalLoopAsync();
});
#pragma warning restore CS4014
}