src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs (517 lines of code) (raw):

// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See LICENSE in the project root for license information. #nullable enable using System; using System.Buffers; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using DurableTask.Core; using DurableTask.Core.Command; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal static class ProtobufUtils { public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e) { var payload = new P.HistoryEvent() { EventId = e.EventId, Timestamp = Timestamp.FromDateTime(e.Timestamp), }; switch (e.EventType) { case EventType.ContinueAsNew: var continueAsNew = (ContinueAsNewEvent)e; payload.ContinueAsNew = new P.ContinueAsNewEvent { Input = continueAsNew.Result, }; break; case EventType.EventRaised: var eventRaised = (EventRaisedEvent)e; payload.EventRaised = new P.EventRaisedEvent { Name = eventRaised.Name, Input = eventRaised.Input, }; break; case EventType.EventSent: var eventSent = (EventSentEvent)e; payload.EventSent = new P.EventSentEvent { Name = eventSent.Name, Input = eventSent.Input, InstanceId = eventSent.InstanceId, }; break; case EventType.ExecutionCompleted: var completedEvent = (ExecutionCompletedEvent)e; payload.ExecutionCompleted = new P.ExecutionCompletedEvent { OrchestrationStatus = P.OrchestrationStatus.Completed, Result = completedEvent.Result, }; break; case EventType.ExecutionFailed: var failedEvent = (ExecutionCompletedEvent)e; payload.ExecutionCompleted = new P.ExecutionCompletedEvent { OrchestrationStatus = P.OrchestrationStatus.Failed, Result = failedEvent.Result, }; break; case EventType.ExecutionStarted: // Start of a new orchestration instance var startedEvent = (ExecutionStartedEvent)e; payload.ExecutionStarted = new P.ExecutionStartedEvent { Name = startedEvent.Name, Version = startedEvent.Version, Input = startedEvent.Input, OrchestrationInstance = new P.OrchestrationInstance { InstanceId = startedEvent.OrchestrationInstance.InstanceId, ExecutionId = startedEvent.OrchestrationInstance.ExecutionId, }, ParentInstance = startedEvent.ParentInstance == null ? null : new P.ParentInstanceInfo { Name = startedEvent.ParentInstance.Name, Version = startedEvent.ParentInstance.Version, TaskScheduledId = startedEvent.ParentInstance.TaskScheduleId, OrchestrationInstance = new P.OrchestrationInstance { InstanceId = startedEvent.ParentInstance.OrchestrationInstance.InstanceId, ExecutionId = startedEvent.ParentInstance.OrchestrationInstance.ExecutionId, }, }, ScheduledStartTimestamp = startedEvent.ScheduledStartTime == null ? null : Timestamp.FromDateTime(startedEvent.ScheduledStartTime.Value), ParentTraceContext = startedEvent.ParentTraceContext == null ? null : new P.TraceContext { TraceParent = startedEvent.ParentTraceContext.TraceParent, TraceState = startedEvent.ParentTraceContext.TraceState, }, }; break; case EventType.ExecutionTerminated: var terminatedEvent = (ExecutionTerminatedEvent)e; payload.ExecutionTerminated = new P.ExecutionTerminatedEvent { Input = terminatedEvent.Input, }; break; case EventType.TaskScheduled: var taskScheduledEvent = (TaskScheduledEvent)e; payload.TaskScheduled = new P.TaskScheduledEvent { Name = taskScheduledEvent.Name, Version = taskScheduledEvent.Version, Input = taskScheduledEvent.Input, }; break; case EventType.TaskCompleted: var taskCompletedEvent = (TaskCompletedEvent)e; payload.TaskCompleted = new P.TaskCompletedEvent { Result = taskCompletedEvent.Result, TaskScheduledId = taskCompletedEvent.TaskScheduledId, }; break; case EventType.TaskFailed: var taskFailedEvent = (TaskFailedEvent)e; payload.TaskFailed = new P.TaskFailedEvent { FailureDetails = GetFailureDetails(taskFailedEvent.FailureDetails), TaskScheduledId = taskFailedEvent.TaskScheduledId, }; break; case EventType.SubOrchestrationInstanceCreated: var subOrchestrationCreated = (SubOrchestrationInstanceCreatedEvent)e; payload.SubOrchestrationInstanceCreated = new P.SubOrchestrationInstanceCreatedEvent { Input = subOrchestrationCreated.Input, InstanceId = subOrchestrationCreated.InstanceId, Name = subOrchestrationCreated.Name, Version = subOrchestrationCreated.Version, }; break; case EventType.SubOrchestrationInstanceCompleted: var subOrchestrationCompleted = (SubOrchestrationInstanceCompletedEvent)e; payload.SubOrchestrationInstanceCompleted = new P.SubOrchestrationInstanceCompletedEvent { Result = subOrchestrationCompleted.Result, TaskScheduledId = subOrchestrationCompleted.TaskScheduledId, }; break; case EventType.SubOrchestrationInstanceFailed: var subOrchestrationFailed = (SubOrchestrationInstanceFailedEvent)e; payload.SubOrchestrationInstanceFailed = new P.SubOrchestrationInstanceFailedEvent { FailureDetails = GetFailureDetails(subOrchestrationFailed.FailureDetails), TaskScheduledId = subOrchestrationFailed.TaskScheduledId, }; break; case EventType.TimerCreated: var timerCreatedEvent = (TimerCreatedEvent)e; payload.TimerCreated = new P.TimerCreatedEvent { FireAt = Timestamp.FromDateTime(timerCreatedEvent.FireAt), }; break; case EventType.TimerFired: var timerFiredEvent = (TimerFiredEvent)e; payload.TimerFired = new P.TimerFiredEvent { FireAt = Timestamp.FromDateTime(timerFiredEvent.FireAt), TimerId = timerFiredEvent.TimerId, }; break; case EventType.OrchestratorStarted: // This event has no data payload.OrchestratorStarted = new P.OrchestratorStartedEvent(); break; case EventType.OrchestratorCompleted: // This event has no data payload.OrchestratorCompleted = new P.OrchestratorCompletedEvent(); break; case EventType.GenericEvent: var genericEvent = (GenericEvent)e; payload.GenericEvent = new P.GenericEvent { Data = genericEvent.Data, }; break; case EventType.HistoryState: var historyStateEvent = (HistoryStateEvent)e; payload.HistoryState = new P.HistoryStateEvent { OrchestrationState = new P.OrchestrationState { InstanceId = historyStateEvent.State.OrchestrationInstance.InstanceId, Name = historyStateEvent.State.Name, Version = historyStateEvent.State.Version, Input = historyStateEvent.State.Input, Output = historyStateEvent.State.Output, ScheduledStartTimestamp = historyStateEvent.State.ScheduledStartTime == null ? null : Timestamp.FromDateTime(historyStateEvent.State.ScheduledStartTime.Value), CreatedTimestamp = Timestamp.FromDateTime(historyStateEvent.State.CreatedTime), LastUpdatedTimestamp = Timestamp.FromDateTime(historyStateEvent.State.LastUpdatedTime), OrchestrationStatus = (P.OrchestrationStatus)historyStateEvent.State.OrchestrationStatus, CustomStatus = historyStateEvent.State.Status, }, }; break; case EventType.ExecutionSuspended: var suspendedEvent = (ExecutionSuspendedEvent)e; payload.ExecutionSuspended = new P.ExecutionSuspendedEvent { Input = suspendedEvent.Reason, }; break; case EventType.ExecutionResumed: var resumedEvent = (ExecutionResumedEvent)e; payload.ExecutionResumed = new P.ExecutionResumedEvent { Input = resumedEvent.Reason, }; break; default: throw new NotSupportedException($"Found unsupported history event '{e.EventType}'."); } return payload; } public static OrchestratorAction ToOrchestratorAction(P.OrchestratorAction a) { switch (a.OrchestratorActionTypeCase) { case P.OrchestratorAction.OrchestratorActionTypeOneofCase.ScheduleTask: return new ScheduleTaskOrchestratorAction { Id = a.Id, Input = a.ScheduleTask.Input, Name = a.ScheduleTask.Name, Version = a.ScheduleTask.Version, }; case P.OrchestratorAction.OrchestratorActionTypeOneofCase.CreateSubOrchestration: return new CreateSubOrchestrationAction { Id = a.Id, Input = a.CreateSubOrchestration.Input, Name = a.CreateSubOrchestration.Name, InstanceId = a.CreateSubOrchestration.InstanceId, Tags = null, // TODO Version = a.CreateSubOrchestration.Version, }; case P.OrchestratorAction.OrchestratorActionTypeOneofCase.CreateTimer: return new CreateTimerOrchestratorAction { Id = a.Id, FireAt = a.CreateTimer.FireAt.ToDateTime(), }; case P.OrchestratorAction.OrchestratorActionTypeOneofCase.SendEvent: return new SendEventOrchestratorAction { Id = a.Id, Instance = new OrchestrationInstance { InstanceId = a.SendEvent.Instance.InstanceId, ExecutionId = a.SendEvent.Instance.ExecutionId, }, EventName = a.SendEvent.Name, EventData = a.SendEvent.Data, }; case P.OrchestratorAction.OrchestratorActionTypeOneofCase.CompleteOrchestration: P.CompleteOrchestrationAction? completedAction = a.CompleteOrchestration; var action = new OrchestrationCompleteOrchestratorAction { Id = a.Id, OrchestrationStatus = (OrchestrationStatus)completedAction.OrchestrationStatus, Result = completedAction.Result, Details = completedAction.Details, FailureDetails = GetFailureDetails(completedAction.FailureDetails), NewVersion = completedAction.NewVersion, }; if (completedAction.CarryoverEvents?.Count > 0) { foreach (P.HistoryEvent e in completedAction.CarryoverEvents) { // Only raised events are supported for carryover if (e.EventRaised is P.EventRaisedEvent eventRaised) { action.CarryoverEvents.Add(new EventRaisedEvent(e.EventId, eventRaised.Input) { Name = eventRaised.Name, }); } } } return action; default: throw new NotSupportedException($"Received unsupported action type '{a.OrchestratorActionTypeCase}'."); } } [return: NotNullIfNotNull("parameters")] public static P.OrchestratorEntityParameters? ToProtobuf(this TaskOrchestrationEntityParameters? parameters) { if (parameters == null) { return null; } return new P.OrchestratorEntityParameters { EntityMessageReorderWindow = Duration.FromTimeSpan(parameters.EntityMessageReorderWindow), }; } public static string Base64Encode(IMessage message) { // Create a serialized payload using lower-level protobuf APIs. We do this to avoid allocating // byte[] arrays for every request, which would otherwise put a heavy burden on the GC. Unfortunately // the protobuf API version we're using doesn't currently have memory-efficient serialization APIs. int messageSize = message.CalculateSize(); byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(messageSize); try { using var intermediateBufferStream = new MemoryStream(rentedBuffer, 0, messageSize); using var protobufOutputStream = new CodedOutputStream(intermediateBufferStream); protobufOutputStream.WriteRawMessage(message); protobufOutputStream.Flush(); return Convert.ToBase64String(rentedBuffer, 0, messageSize); } finally { ArrayPool<byte>.Shared.Return(rentedBuffer); } } internal static FailureDetails? GetFailureDetails(P.TaskFailureDetails? failureDetails) { if (failureDetails == null) { return null; } return new FailureDetails( failureDetails.ErrorType, failureDetails.ErrorMessage, failureDetails.StackTrace, GetFailureDetails(failureDetails.InnerFailure), failureDetails.IsNonRetriable); } internal static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) { if (failureDetails == null) { return null; } return new P.TaskFailureDetails { ErrorType = failureDetails.ErrorType, ErrorMessage = failureDetails.ErrorMessage, StackTrace = failureDetails.StackTrace, InnerFailure = GetFailureDetails(failureDetails.InnerFailure), IsNonRetriable = failureDetails.IsNonRetriable, }; } internal static OrchestrationQuery ToOrchestrationQuery(P.QueryInstancesRequest request) { var query = new OrchestrationQuery() { RuntimeStatus = request.Query.RuntimeStatus?.Select(status => (OrchestrationStatus)status).ToList(), CreatedTimeFrom = request.Query.CreatedTimeFrom?.ToDateTime(), CreatedTimeTo = request.Query.CreatedTimeTo?.ToDateTime(), TaskHubNames = request.Query.TaskHubNames, PageSize = request.Query.MaxInstanceCount, ContinuationToken = request.Query.ContinuationToken, InstanceIdPrefix = request.Query.InstanceIdPrefix, FetchInputsAndOutputs = request.Query.FetchInputsAndOutputs, ExcludeEntities = true, }; // Empty lists are not allowed by the underlying code that takes in an OrchestrationQuery. However, // some clients use empty lists instead of nulls. Need to overwrite empty lists with null values. if (query.TaskHubNames?.Count == 0) { query.TaskHubNames = null; } if (query.RuntimeStatus?.Count == 0) { query.RuntimeStatus = null; } return query; } internal static P.QueryInstancesResponse CreateQueryInstancesResponse(OrchestrationQueryResult result, P.QueryInstancesRequest request) { var response = new P.QueryInstancesResponse { ContinuationToken = result.ContinuationToken }; foreach (OrchestrationState state in result.OrchestrationState) { var orchestrationState = new P.OrchestrationState { InstanceId = state.OrchestrationInstance.InstanceId, Name = state.Name, Version = state.Version, Input = state.Input, Output = state.Output, ScheduledStartTimestamp = state.ScheduledStartTime == null ? null : Timestamp.FromDateTime(state.ScheduledStartTime.Value), CreatedTimestamp = Timestamp.FromDateTime(state.CreatedTime), LastUpdatedTimestamp = Timestamp.FromDateTime(state.LastUpdatedTime), OrchestrationStatus = (P.OrchestrationStatus)state.OrchestrationStatus, CustomStatus = state.Status, }; response.OrchestrationState.Add(orchestrationState); } return response; } internal static PurgeInstanceFilter ToPurgeInstanceFilter(P.PurgeInstancesRequest request) { // Empty lists are not allowed by the underlying code that takes in a PurgeInstanceFilter. However, some // clients (like Java) may use empty lists by default instead of nulls. // Long story short: we must make sure to only copy over the list if it's non-empty. IEnumerable<OrchestrationStatus>? statusFilter = null; if (request.PurgeInstanceFilter.RuntimeStatus != null && request.PurgeInstanceFilter.RuntimeStatus.Count > 0) { statusFilter = request.PurgeInstanceFilter.RuntimeStatus?.Select(status => (OrchestrationStatus)status).ToList(); } // This ternary condition is necessary because the protobuf spec __insists__ that CreatedTimeFrom may never be null, // but nonetheless if you pass null in function code, the value will be null here return new PurgeInstanceFilter( request.PurgeInstanceFilter.CreatedTimeFrom == null ? DateTime.MinValue : request.PurgeInstanceFilter.CreatedTimeFrom.ToDateTime(), request.PurgeInstanceFilter.CreatedTimeTo?.ToDateTime(), statusFilter); } internal static P.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeResult result) { return new P.PurgeInstancesResponse { DeletedInstanceCount = result.DeletedInstanceCount, }; } /// <summary> /// Converts a <see cref="EntityBatchRequest" /> to <see cref="P.EntityBatchRequest" />. /// </summary> /// <param name="entityBatchRequest">The operation request to convert.</param> /// <returns>The converted operation request.</returns> [return: NotNullIfNotNull("entityBatchRequest")] internal static P.EntityBatchRequest? ToEntityBatchRequest(this EntityBatchRequest? entityBatchRequest) { if (entityBatchRequest == null) { return null; } var batchRequest = new P.EntityBatchRequest() { InstanceId = entityBatchRequest.InstanceId, EntityState = entityBatchRequest.EntityState, }; foreach (var operation in entityBatchRequest.Operations ?? Enumerable.Empty<OperationRequest>()) { batchRequest.Operations.Add(operation.ToOperationRequest()); } return batchRequest; } /// <summary> /// Converts a <see cref="OperationRequest" /> to <see cref="P.OperationRequest" />. /// </summary> /// <param name="operationRequest">The operation request to convert.</param> /// <returns>The converted operation request.</returns> [return: NotNullIfNotNull("operationRequest")] internal static P.OperationRequest? ToOperationRequest(this OperationRequest? operationRequest) { if (operationRequest == null) { return null; } return new P.OperationRequest() { Operation = operationRequest.Operation, Input = operationRequest.Input, RequestId = operationRequest.Id.ToString(), }; } /// <summary> /// Converts a <see cref="P.EntityBatchResult" /> to a <see cref="OperationBatchResult" />. /// </summary> /// <param name="entityBatchResult">The operation result to convert.</param> /// <returns>The converted operation result.</returns> [return: NotNullIfNotNull("entityBatchResult")] internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult) { if (entityBatchResult == null) { return null; } return new EntityBatchResult() { Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(), EntityState = entityBatchResult.EntityState, Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(), FailureDetails = GetFailureDetails(entityBatchResult.FailureDetails), }; } /// <summary> /// Converts a <see cref="P.OperationAction" /> to a <see cref="OperationAction" />. /// </summary> /// <param name="operationAction">The operation action to convert.</param> /// <returns>The converted operation action.</returns> [return: NotNullIfNotNull("operationAction")] internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction) { if (operationAction == null) { return null; } switch (operationAction.OperationActionTypeCase) { case P.OperationAction.OperationActionTypeOneofCase.SendSignal: return new SendSignalOperationAction() { Name = operationAction.SendSignal.Name, Input = operationAction.SendSignal.Input, InstanceId = operationAction.SendSignal.InstanceId, ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), }; case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration: return new StartNewOrchestrationOperationAction() { Name = operationAction.StartNewOrchestration.Name, Input = operationAction.StartNewOrchestration.Input, InstanceId = operationAction.StartNewOrchestration.InstanceId, Version = operationAction.StartNewOrchestration.Version, }; default: throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported."); } } /// <summary> /// Converts a <see cref="P.OperationResult" /> to a <see cref="OperationResult" />. /// </summary> /// <param name="operationResult">The operation result to convert.</param> /// <returns>The converted operation result.</returns> [return: NotNullIfNotNull("operationResult")] internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult) { if (operationResult == null) { return null; } switch (operationResult.ResultTypeCase) { case P.OperationResult.ResultTypeOneofCase.Success: return new OperationResult() { Result = operationResult.Success.Result, }; case P.OperationResult.ResultTypeOneofCase.Failure: return new OperationResult() { FailureDetails = GetFailureDetails(operationResult.Failure.FailureDetails), }; default: throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported."); } } } }