in src/TriggersBinding/MySqlTriggerListener.cs [90:155]
public async Task StartAsync(CancellationToken cancellationToken)
{
int previousState = Interlocked.CompareExchange(ref this._listenerState, ListenerStarting, ListenerNotStarted);
switch (previousState)
{
case ListenerStarting: throw new InvalidOperationException("The listener is already starting.");
case ListenerStarted: throw new InvalidOperationException("The listener has already started.");
default: break;
}
try
{
using (var connection = new MySqlConnection(this._connectionString))
{
AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(this.HandleException);
await connection.OpenAsyncWithMySqlErrorHandling(cancellationToken);
// get table id If exists in database
string userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, CancellationToken.None);
await VerifyTableForTriggerSupported(connection, this._userTable.FullName, this._logger, cancellationToken);
IReadOnlyList<(string name, string type)> primaryKeyColumns = GetPrimaryKeyColumnsAsync(connection, this._userTable.AcuteQuotedFullName, this._logger, cancellationToken);
IReadOnlyList<string> userTableColumns = this.GetUserTableColumns(connection, this._userTable, cancellationToken);
string bracketedLeasesTableName = GetBracketedLeasesTableName(this._userDefinedLeasesTableName, this._userFunctionId, userTableId);
long createdSchemaDurationMs = 0L, createGlobalStateTableDurationMs = 0L, insertGlobalStateTableRowDurationMs = 0L, createLeasesTableDurationMs = 0L;
using (MySqlTransaction transaction = connection.BeginTransaction(System.Data.IsolationLevel.RepeatableRead))
{
createdSchemaDurationMs = await this.CreateSchemaAsync(connection, transaction, cancellationToken);
createGlobalStateTableDurationMs = await this.CreateGlobalStateTableAsync(connection, transaction, cancellationToken);
insertGlobalStateTableRowDurationMs = await this.InsertGlobalStateTableRowAsync(connection, transaction, userTableId, cancellationToken);
createLeasesTableDurationMs = await this.CreateLeasesTableAsync(connection, transaction, bracketedLeasesTableName, primaryKeyColumns, cancellationToken);
transaction.Commit();
}
this._changeMonitor = new MySqlTableChangeMonitor<T>(
this._connectionString,
userTableId,
this._userTable,
this._userFunctionId,
bracketedLeasesTableName,
primaryKeyColumns,
userTableColumns,
this._executor,
this._mysqlOptions,
this._logger,
this._configuration);
this._listenerState = ListenerStarted;
this._logger.LogDebug($"Started MySQL trigger listener for the table ID: {userTableId} and function ID: {this._userFunctionId}");
this._logger.LogInformation($"CreatedSchemaDurationMs {createdSchemaDurationMs}. CreateGlobalStateTableDurationMs: {createGlobalStateTableDurationMs}. " +
$"InsertGlobalStateTableRowDurationMs: {insertGlobalStateTableRowDurationMs}");
}
}
catch (Exception ex)
{
this._listenerState = ListenerNotStarted;
this._logger.LogError($"Failed to start MySQL trigger listener for the specified table', function ID: '{this._userFunctionId}'. Exception: {ex}");
throw;
}
}