diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
index ae3e355d6..f16fb4bee 100644
--- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
+++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
@@ -19,7 +19,6 @@ namespace DurableTask.AzureStorage.Messaging
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
- using Azure;
using DurableTask.Core;
using DurableTask.Core.History;
using Newtonsoft.Json;
@@ -226,5 +225,11 @@ bool IsNonexistantInstance()
{
return this.RuntimeState.Events.Count == 0 || this.RuntimeState.ExecutionStartedEvent == null;
}
+
+ public Task EndSessionAsync()
+ {
+ // No-op
+ return Task.CompletedTask;
+ }
}
}
diff --git a/src/DurableTask.Core/IOrchestrationSession.cs b/src/DurableTask.Core/IOrchestrationSession.cs
index bb518cbb1..29a51e067 100644
--- a/src/DurableTask.Core/IOrchestrationSession.cs
+++ b/src/DurableTask.Core/IOrchestrationSession.cs
@@ -11,6 +11,7 @@
// limitations under the License.
// ----------------------------------------------------------------------------------
+#nullable enable
namespace DurableTask.Core
{
using System.Collections.Generic;
@@ -29,6 +30,12 @@ public interface IOrchestrationSession
/// or until an internal wait period has expired. In either case, null can be returned
/// and the dispatcher will shut down the session.
///
- Task> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem);
+ Task?> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem);
+
+ ///
+ /// Ends the session.
+ ///
+ /// A task that completes when the session has been ended.
+ Task EndSessionAsync();
}
}
diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs
index f63048a1a..a3d163e28 100644
--- a/src/DurableTask.Core/TaskEntityDispatcher.cs
+++ b/src/DurableTask.Core/TaskEntityDispatcher.cs
@@ -193,6 +193,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
if (concurrencyLockAcquired)
{
this.concurrentSessionLock.Release();
+ await workItem.Session.EndSessionAsync();
}
}
}
diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
index c85536793..fa0ad2436 100644
--- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
+++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
@@ -295,6 +295,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
"OnProcessWorkItemSession-Release",
$"Releasing extended session after {processCount} batch(es).");
this.concurrentSessionLock.Release();
+ await workItem.Session.EndSessionAsync();
}
}
}
@@ -552,10 +553,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
orchestratorMessages.AddRange(subOrchestrationRewindMessages);
workItem.OrchestrationRuntimeState = newRuntimeState;
runtimeState = newRuntimeState;
- // Setting this to true here will end an extended session if it is in progress.
- // We don't want to save the state across executions, since we essentially manually modify
- // the orchestration history here and so that stored by the extended session is incorrect.
- isRewinding = true;
break;
default:
throw TraceHelper.TraceExceptionInstance(