async Task TrySetSessionStateAsync()

in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [1597:1667]


        async Task<bool> TrySetSessionStateAsync(
            TaskOrchestrationWorkItem workItem,
            OrchestrationRuntimeState newOrchestrationRuntimeState,
            OrchestrationRuntimeState runtimeState,
            IMessageSession session)
        {
            if (runtimeState.CompressedSize > SessionStreamWarningSizeInBytes && runtimeState.CompressedSize < this.Settings.SessionSettings.SessionOverflowThresholdInBytes)
            {
                TraceHelper.TraceSession(
                    TraceEventType.Error,
                    "ServiceBusOrchestrationService-SessionStateThresholdApproaching",
                    workItem.InstanceId,
                    $"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {this.Settings.SessionSettings.SessionOverflowThresholdInBytes}B");
            }

            var isSessionSizeThresholdExceeded = false;

            if (newOrchestrationRuntimeState == null ||
                newOrchestrationRuntimeState.ExecutionStartedEvent == null ||
                newOrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running)
            {
                await session.SetStateAsync(null);
                return true;
            }

            try
            {
                Stream rawStream = await
                    RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(
                        newOrchestrationRuntimeState,
                        runtimeState,
                        JsonDataConverter.Default,
                        this.Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState,
                        this.Settings.SessionSettings,
                        this.BlobStore,
                        session.SessionId);

                using (var ms = new MemoryStream())
                {
                    await rawStream.CopyToAsync(ms);
                    await session.SetStateAsync(ms.ToArray());
                }

                this.ServiceStats.OrchestrationDispatcherStats.SessionSets.Increment();
            }
            catch (OrchestrationException exception)
            {
                // basic idea is to simply enqueue a terminate message just like how we do it from TaskHubClient
                // it is possible to have other messages in front of the queue and those will get processed before
                // the terminate message gets processed. but that is ok since in the worst case scenario we will
                // simply land in this if-block again and end up queuing up another terminate message.
                //
                // the interesting scenario is when the second time we *don't* land in this if-block because e.g.
                // the new messages that we processed caused a new generation to be created. in that case
                // it is still ok because the worst case scenario is that we will terminate a newly created generation
                // which shouldn't have been created at all in the first place

                isSessionSizeThresholdExceeded = true;

                string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {this.Settings.SessionSettings.SessionMaxSizeInBytes} bytes. More info: {exception.StackTrace}";
                TraceHelper.TraceSession(TraceEventType.Critical, "ServiceBusOrchestrationService-SessionSizeExceeded", workItem.InstanceId, reason);

                Message forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason);
                await this.orchestratorQueueClient.SendAsync(forcedTerminateMessage);

                this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment();
                this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
            }

            return !isSessionSizeThresholdExceeded;
        }