in src/TriggerBinding/SqlTriggerListener.cs [103:188]
public async Task StartAsync(CancellationToken cancellationToken)
{
int previousState = Interlocked.CompareExchange(ref this._listenerState, ListenerStarting, ListenerNotStarted);
switch (previousState)
{
case ListenerStarting: throw new InvalidOperationException("The listener is already starting.");
case ListenerStarted: throw new InvalidOperationException("The listener has already started.");
default: break;
}
this.InitializeTelemetryProps();
try
{
using (var connection = new SqlConnection(this._connectionString))
{
await connection.OpenAsyncWithSqlErrorHandling(cancellationToken);
ServerProperties serverProperties = await GetServerTelemetryProperties(connection, this._logger, cancellationToken);
this._telemetryProps.AddConnectionProps(connection, serverProperties);
await VerifyDatabaseSupported(connection, this._logger, cancellationToken);
int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, cancellationToken);
IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken);
IReadOnlyList<string> userTableColumns = this.GetUserTableColumns(connection, userTableId, cancellationToken);
string bracketedLeasesTableName = GetBracketedLeasesTableName(this._userDefinedLeasesTableName, this._userFunctionId, userTableId);
this._telemetryProps[TelemetryPropertyName.LeasesTableName] = bracketedLeasesTableName;
var transactionSw = Stopwatch.StartNew();
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L;
using (SqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
{
createdSchemaDurationMs = await this.CreateSchemaAsync(connection, transaction, cancellationToken);
createGlobalStateTableDurationMs = await this.CreateGlobalStateTableAsync(connection, transaction, cancellationToken);
insertGlobalStateTableRowDurationMs = await this.InsertGlobalStateTableRowAsync(connection, transaction, userTableId, cancellationToken);
createLeasesTableDurationMs = await this.CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, cancellationToken);
transaction.Commit();
}
this._changeMonitor = new SqlTableChangeMonitor<T>(
this._connectionString,
userTableId,
this._userTable,
this._userFunctionId,
bracketedLeasesTableName,
userTableColumns,
primaryKeyColumns,
this._executor,
this._sqlOptions,
this._logger,
this._configuration,
this._telemetryProps);
this._listenerState = ListenerStarted;
this._logger.LogDebug($"Started SQL trigger listener for table: '{this._userTable.FullName}' (object ID: {userTableId}), function ID: {this._userFunctionId}, leases table: {bracketedLeasesTableName}");
var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.CreatedSchemaDurationMs] = createdSchemaDurationMs,
[TelemetryMeasureName.CreateGlobalStateTableDurationMs] = createGlobalStateTableDurationMs,
[TelemetryMeasureName.InsertGlobalStateTableRowDurationMs] = insertGlobalStateTableRowDurationMs,
[TelemetryMeasureName.CreateLeasesTableDurationMs] = createLeasesTableDurationMs,
[TelemetryMeasureName.TransactionDurationMs] = transactionSw.ElapsedMilliseconds,
[TelemetryMeasureName.MaxChangesPerWorker] = this._maxChangesPerWorker
};
TelemetryInstance.TrackEvent(
TelemetryEventName.StartListener,
new Dictionary<TelemetryPropertyName, string>(this._telemetryProps) {
{ TelemetryPropertyName.HasConfiguredMaxChangesPerWorker, this._hasConfiguredMaxChangesPerWorker.ToString() }
},
measures);
}
}
catch (Exception ex)
{
this._listenerState = ListenerNotStarted;
this._logger.LogError($"Failed to start SQL trigger listener for table: '{this._userTable.FullName}', function ID: '{this._userFunctionId}'. Exception: {ex}");
TelemetryInstance.TrackException(TelemetryErrorName.StartListener, ex, this._telemetryProps);
throw;
}
}