diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index ae3e355d6..f16fb4bee 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -19,7 +19,6 @@ namespace DurableTask.AzureStorage.Messaging using System.Linq; using System.Threading; using System.Threading.Tasks; - using Azure; using DurableTask.Core; using DurableTask.Core.History; using Newtonsoft.Json; @@ -226,5 +225,11 @@ bool IsNonexistantInstance() { return this.RuntimeState.Events.Count == 0 || this.RuntimeState.ExecutionStartedEvent == null; } + + public Task EndSessionAsync() + { + // No-op + return Task.CompletedTask; + } } } diff --git a/src/DurableTask.Core/IOrchestrationSession.cs b/src/DurableTask.Core/IOrchestrationSession.cs index bb518cbb1..29a51e067 100644 --- a/src/DurableTask.Core/IOrchestrationSession.cs +++ b/src/DurableTask.Core/IOrchestrationSession.cs @@ -11,6 +11,7 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +#nullable enable namespace DurableTask.Core { using System.Collections.Generic; @@ -29,6 +30,12 @@ public interface IOrchestrationSession /// or until an internal wait period has expired. In either case, null can be returned /// and the dispatcher will shut down the session. /// - Task> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem); + Task?> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem); + + /// + /// Ends the session. + /// + /// A task that completes when the session has been ended. + Task EndSessionAsync(); } } diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index f63048a1a..a3d163e28 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -193,6 +193,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) if (concurrencyLockAcquired) { this.concurrentSessionLock.Release(); + await workItem.Session.EndSessionAsync(); } } } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index c85536793..fa0ad2436 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -295,6 +295,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) "OnProcessWorkItemSession-Release", $"Releasing extended session after {processCount} batch(es)."); this.concurrentSessionLock.Release(); + await workItem.Session.EndSessionAsync(); } } } @@ -552,10 +553,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work orchestratorMessages.AddRange(subOrchestrationRewindMessages); workItem.OrchestrationRuntimeState = newRuntimeState; runtimeState = newRuntimeState; - // Setting this to true here will end an extended session if it is in progress. - // We don't want to save the state across executions, since we essentially manually modify - // the orchestration history here and so that stored by the extended session is incorrect. - isRewinding = true; break; default: throw TraceHelper.TraceExceptionInstance(