in src/Microsoft.Kusto.ServiceLayer/LanguageServices/BindingQueue.cs [263:450]
private void ProcessQueue()
{
CancellationToken token = this._processQueueCancelToken.Token;
WaitHandle[] waitHandles = new WaitHandle[2]
{
this._itemQueuedEvent,
token.WaitHandle
};
while (true)
{
// wait for with an item to be queued or the a cancellation request
WaitHandle.WaitAny(waitHandles);
if (token.IsCancellationRequested)
{
break;
}
try
{
// dispatch all pending queue items
while (this.HasPendingQueueItems)
{
QueueItem queueItem = GetNextQueueItem();
if (queueItem == null)
{
continue;
}
IBindingContext bindingContext = GetOrCreateBindingContext(queueItem.Key);
if (bindingContext == null)
{
queueItem.ItemProcessed.Set();
continue;
}
var bindingContextTask = this.BindingContextTasks[bindingContext];
// Run in the binding context task in case this task has to wait for a previous binding operation
this.BindingContextTasks[bindingContext] = bindingContextTask.ContinueWith((task) =>
{
bool lockTaken = false;
try
{
// prefer the queue item binding item, otherwise use the context default timeout
int bindTimeout = queueItem.BindingTimeout ?? bindingContext.BindingTimeout;
// handle the case a previous binding operation is still running
if (!bindingContext.BindingLock.WaitOne(queueItem.WaitForLockTimeout ?? 0))
{
try
{
Logger.Write(TraceEventType.Warning, "Binding queue operation timed out waiting for previous operation to finish");
queueItem.Result = queueItem.TimeoutOperation != null
? queueItem.TimeoutOperation(bindingContext)
: null;
}
catch (Exception ex)
{
Logger.Write(TraceEventType.Error, "Exception running binding queue lock timeout handler: " + ex.ToString());
}
finally
{
queueItem.ItemProcessed.Set();
}
return;
}
bindingContext.BindingLock.Reset();
lockTaken = true;
// execute the binding operation
object result = null;
CancellationTokenSource cancelToken = new CancellationTokenSource();
// run the operation in a separate thread
var bindTask = Task.Run(() =>
{
try
{
result = queueItem.BindOperation(
bindingContext,
cancelToken.Token);
}
catch (Exception ex)
{
Logger.Write(TraceEventType.Error, "Unexpected exception on the binding queue: " + ex.ToString());
if (queueItem.ErrorHandler != null)
{
try
{
result = queueItem.ErrorHandler(ex);
}
catch (Exception ex2)
{
Logger.Write(TraceEventType.Error, "Unexpected exception in binding queue error handler: " + ex2.ToString());
}
}
if (IsExceptionOfType(ex, typeof(SqlException)) || IsExceptionOfType(ex, typeof(SocketException)))
{
if (this.OnUnhandledException != null)
{
this.OnUnhandledException(queueItem.Key, ex);
}
RemoveBindingContext(queueItem.Key);
}
}
});
Task.Run(() =>
{
try
{
// check if the binding tasks completed within the binding timeout
if (bindTask.Wait(bindTimeout))
{
queueItem.Result = result;
}
else
{
cancelToken.Cancel();
// if the task didn't complete then call the timeout callback
if (queueItem.TimeoutOperation != null)
{
queueItem.Result = queueItem.TimeoutOperation(bindingContext);
}
bindTask.ContinueWithOnFaulted(t => Logger.Write(TraceEventType.Error, "Binding queue threw exception " + t.Exception.ToString()));
// Give the task a chance to complete before moving on to the next operation
bindTask.Wait();
}
}
catch (Exception ex)
{
Logger.Write(TraceEventType.Error, "Binding queue task completion threw exception " + ex.ToString());
}
finally
{
// set item processed to avoid deadlocks
if (lockTaken)
{
bindingContext.BindingLock.Set();
}
queueItem.ItemProcessed.Set();
}
});
}
catch (Exception ex)
{
// catch and log any exceptions raised in the binding calls
// set item processed to avoid deadlocks
Logger.Write(TraceEventType.Error, "Binding queue threw exception " + ex.ToString());
// set item processed to avoid deadlocks
if (lockTaken)
{
bindingContext.BindingLock.Set();
}
queueItem.ItemProcessed.Set();
}
}, TaskContinuationOptions.None);
// if a queue processing cancellation was requested then exit the loop
if (token.IsCancellationRequested)
{
break;
}
}
}
finally
{
lock (this._bindingQueueLock)
{
// verify the binding queue is still empty
if (this._bindingQueue.Count == 0)
{
// reset the item queued event since we've processed all the pending items
this._itemQueuedEvent.Reset();
}
}
}
}
}