in src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableOrchestrationContext.cs [910:977]
internal void RaiseEvent(string name, string input)
{
lock (this.pendingExternalEvents)
{
IEventTaskCompletionSource taskCompletionSources;
if (this.pendingExternalEvents.TryGetValue(name, out taskCompletionSources))
{
IEventTaskCompletionSource tcs = taskCompletionSources;
// If we're going to raise an event we should remove it from the pending collection
// because otherwise WaitForExternalEventAsync() will always find one with this key and run infinitely.
IEventTaskCompletionSource next = tcs.Next;
if (next == null)
{
this.pendingExternalEvents.Remove(name);
}
else
{
this.pendingExternalEvents[name] = next;
}
object deserializedObject = this.messageDataConverter.Deserialize(input, tcs.EventType);
if (deserializedObject is ResponseMessage responseMessage)
{
this.Config.TraceHelper.EntityResponseReceived(
this.HubName,
this.Name,
FunctionType.Orchestrator,
this.InstanceId,
name,
responseMessage.Result,
this.IsReplaying);
}
else
{
this.Config.TraceHelper.ExternalEventRaised(
this.HubName,
this.Name,
this.InstanceId,
name,
input,
this.IsReplaying);
}
tcs.TrySetResult(deserializedObject);
}
else
{
// Add the event to an (in-memory) queue, so we don't drop or lose it
if (!this.bufferedExternalEvents.TryGetValue(name, out Queue<string> bufferedEvents))
{
bufferedEvents = new Queue<string>();
this.bufferedExternalEvents[name] = bufferedEvents;
}
bufferedEvents.Enqueue(input);
this.Config.TraceHelper.ExternalEventSaved(
this.HubName,
this.Name,
FunctionType.Orchestrator,
this.InstanceId,
name,
this.IsReplaying);
}
}
}