diff --git a/lib/api/Logger.cpp b/lib/api/Logger.cpp
index aec4b9e52..75d3bd292 100644
--- a/lib/api/Logger.cpp
+++ b/lib/api/Logger.cpp
@@ -127,7 +127,8 @@ namespace MAT_NS_BEGIN
Logger::~Logger() noexcept
{
- LOG_TRACE("%p: Destroyed", this);
+ // Intentionally empty — logging here triggers a static-destruction-order
+ // crash on iOS simulator (recursive_mutex used after teardown).
}
ISemanticContext* Logger::GetSemanticContext() const
diff --git a/lib/http/HttpResponseDecoder.cpp b/lib/http/HttpResponseDecoder.cpp
index 11e9d4096..941931c1e 100644
--- a/lib/http/HttpResponseDecoder.cpp
+++ b/lib/http/HttpResponseDecoder.cpp
@@ -67,13 +67,11 @@ namespace MAT_NS_BEGIN {
break;
case HttpResult_Aborted:
- ctx->httpResponse = nullptr;
outcome = Abort;
break;
case HttpResult_LocalFailure:
case HttpResult_NetworkFailure:
- ctx->httpResponse = nullptr;
outcome = RetryNetwork;
break;
}
@@ -129,7 +127,6 @@ namespace MAT_NS_BEGIN {
evt.param1 = 0; // response.GetStatusCode();
DispatchEvent(evt);
}
- ctx->httpResponse = nullptr;
// eventsRejected(ctx); // FIXME: [MG] - investigate why ctx gets corrupt after eventsRejected
requestAborted(ctx);
break;
@@ -253,4 +250,3 @@ namespace MAT_NS_BEGIN {
}
} MAT_NS_END
-
diff --git a/lib/include/public/ITaskDispatcher.hpp b/lib/include/public/ITaskDispatcher.hpp
index 070f054bc..8c0585696 100644
--- a/lib/include/public/ITaskDispatcher.hpp
+++ b/lib/include/public/ITaskDispatcher.hpp
@@ -114,6 +114,23 @@ namespace MAT_NS_BEGIN
/// Task to be executed on a worker thread
virtual void Queue(Task* task) = 0;
+ ///
+ /// Queue an asynchronous task and report whether the dispatcher accepted
+ /// it. Returns false if the task could not be queued (for example because
+ /// the dispatcher is shutting down) and was therefore destroyed by the
+ /// dispatcher; true otherwise. Callers that retain the task pointer for
+ /// later cancellation should treat a false result as "not scheduled" and
+ /// drop the pointer. The default delegates to Queue() and assumes success,
+ /// so existing dispatcher implementations keep their current behavior.
+ ///
+ /// Task to be executed on a worker thread
+ /// True if the task was queued, false if it was dropped
+ virtual bool QueueWithResult(Task* task)
+ {
+ Queue(task);
+ return true;
+ }
+
///
/// Cancel a previously queued tasks
///
diff --git a/lib/pal/TaskDispatcher.hpp b/lib/pal/TaskDispatcher.hpp
index ec6f2f690..3dfa7bffe 100644
--- a/lib/pal/TaskDispatcher.hpp
+++ b/lib/pal/TaskDispatcher.hpp
@@ -122,7 +122,14 @@ namespace PAL_NS_BEGIN {
{
auto bound = std::bind(std::mem_fn(func), obj, std::forward(args)...);
auto task = new detail::TaskCall(bound, getMonotonicTimeMs() + (int64_t)delayMs);
- taskDispatcher->Queue(task);
+ if (!taskDispatcher->QueueWithResult(task))
+ {
+ // The dispatcher could not queue the task (for example during
+ // shutdown) and has already destroyed it. Return a no-op handle so the
+ // caller never holds a pointer to a freed task and Cancel() is a safe
+ // no-op.
+ return DeferredCallbackHandle();
+ }
return DeferredCallbackHandle(task, taskDispatcher);
}
diff --git a/lib/pal/WorkerThread.cpp b/lib/pal/WorkerThread.cpp
index 2bdbf6c67..f7435dc56 100644
--- a/lib/pal/WorkerThread.cpp
+++ b/lib/pal/WorkerThread.cpp
@@ -6,6 +6,8 @@
#include "pal/WorkerThread.hpp"
#include "pal/PAL.hpp"
+#include
+
#if defined(MATSDK_PAL_CPP11) || defined(MATSDK_PAL_WIN32)
/* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */
@@ -35,7 +37,7 @@ namespace PAL_NS_BEGIN {
std::list m_timerQueue;
Event m_event;
MAT::Task* m_itemInProgress;
- int count = 0;
+ bool m_shuttingDown = false;
public:
@@ -53,32 +55,70 @@ namespace PAL_NS_BEGIN {
void Join() final
{
- auto item = new WorkerThreadShutdownItem();
- Queue(item);
std::thread::id this_id = std::this_thread::get_id();
+ bool joined = false;
+ {
+ LOCKGUARD(m_lock);
+ if (!m_shuttingDown) {
+ m_shuttingDown = true;
+ m_queue.push_back(new WorkerThreadShutdownItem());
+ m_event.post();
+ }
+ }
try {
- if (m_hThread.joinable() && (m_hThread.get_id() != this_id))
+ if (!m_hThread.joinable()) {
+ return;
+ }
+ if (m_hThread.get_id() != this_id) {
m_hThread.join();
- else
+ joined = true;
+ } else {
m_hThread.detach();
+ }
+ }
+ catch (const std::system_error& e) {
+ LOG_ERROR("Thread join/detach failed: [%d] %s", e.code().value(), e.what());
+ }
+ catch (const std::exception& e) {
+ LOG_ERROR("Thread join/detach failed: %s", e.what());
}
- catch (...) {};
- // TODO: [MG] - investigate if we ever drop work items on shutdown.
- if (!m_queue.empty())
- {
- LOG_WARN("m_queue is not empty!");
+ // Log pending work in both paths so operators can see if
+ // shutdown is dropping tasks.
+ LOCKGUARD(m_lock);
+ if (!m_queue.empty()) {
+ LOG_WARN("Shutdown with %zu queued task(s) pending", m_queue.size());
}
- if (!m_timerQueue.empty())
- {
- LOG_WARN("m_timerQueue is not empty!");
+ if (!m_timerQueue.empty()) {
+ LOG_WARN("Shutdown with %zu timer(s) pending", m_timerQueue.size());
+ }
+
+ // Clean up any tasks remaining in the queues after shutdown.
+ // Only safe after join() — the thread has fully exited.
+ // After detach(), the thread still needs the shutdown item
+ // and may still be accessing the queues.
+ if (joined) {
+ for (auto task : m_queue) { delete task; }
+ m_queue.clear();
+ for (auto task : m_timerQueue) { delete task; }
+ m_timerQueue.clear();
}
}
void Queue(MAT::Task* item) final
{
- LOG_INFO("queue item=%p", &item);
+ QueueWithResult(item);
+ }
+
+ bool QueueWithResult(MAT::Task* item) override
+ {
+ LOG_INFO("queue item=%p", static_cast(item));
LOCKGUARD(m_lock);
+ if (m_shuttingDown) {
+ LOG_WARN("Dropping queued task %p during shutdown", static_cast(item));
+ delete item;
+ return false;
+ }
if (item->Type == MAT::Task::TimedCall) {
auto it = m_timerQueue.begin();
while (it != m_timerQueue.end() && (*it)->TargetTime < item->TargetTime) {
@@ -89,8 +129,8 @@ namespace PAL_NS_BEGIN {
else {
m_queue.push_back(item);
}
- count++;
m_event.post();
+ return true;
}
// Cancel a task or wait for task completion for up to waitTime ms:
@@ -261,4 +301,3 @@ namespace PAL_NS_BEGIN {
} PAL_NS_END
#endif
-
diff --git a/lib/tpm/TransmissionPolicyManager.cpp b/lib/tpm/TransmissionPolicyManager.cpp
index 83b82cf2a..426b4ff82 100644
--- a/lib/tpm/TransmissionPolicyManager.cpp
+++ b/lib/tpm/TransmissionPolicyManager.cpp
@@ -111,26 +111,36 @@ namespace MAT_NS_BEGIN {
LOG_TRACE("Collector URL is not set, no upload.");
return;
}
- LOCKGUARD(m_scheduledUploadMutex);
- if (delay.count() < 0 || m_timerdelay.count() < 0)
- {
- LOG_TRACE("Negative delay(%d) or m_timerdelay(%d), no upload", delay.count(), m_timerdelay.count());
- return;
- }
- if (m_scheduledUploadAborted)
- {
- LOG_TRACE("Scheduled upload aborted, no upload.");
- return;
- }
- if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ]) )
+ auto shouldSkipScheduling = [&delay, this]() -> bool
{
- LOG_TRACE("Maximum number of HTTP requests reached");
- return;
- }
+ if (delay.count() < 0 || m_timerdelay.count() < 0)
+ {
+ LOG_TRACE("Negative delay(%lld) or m_timerdelay(%lld), no upload",
+ delay.count(), m_timerdelay.count());
+ return true;
+ }
+ if (m_scheduledUploadAborted)
+ {
+ LOG_TRACE("Scheduled upload aborted, no upload.");
+ return true;
+ }
+ if (uploadCount() >= static_cast(m_config[CFG_INT_MAX_PENDING_REQ]))
+ {
+ LOG_TRACE("Maximum number of HTTP requests reached");
+ return true;
+ }
+ if (m_isPaused)
+ {
+ LOG_TRACE("Paused, not uploading anything until resumed");
+ return true;
+ }
+
+ return false;
+ };
- if (m_isPaused)
+ LOCKGUARD(m_scheduledUploadMutex);
+ if (shouldSkipScheduling())
{
- LOG_TRACE("Paused, not uploading anything until resumed");
return;
}
@@ -151,10 +161,9 @@ namespace MAT_NS_BEGIN {
if (delta <= static_cast(delay.count()))
{
// Don't need to cancel and reschedule if it's about to happen now anyways.
- // m_isUploadScheduled check does not have to be strictly atomic because
// the completion of upload will schedule more uploads as-needed, we only
// want to avoid the unnecessary wasteful rescheduling.
- LOG_TRACE("WAIT upload %d ms for lat=%d", delta, m_runningLatency);
+ LOG_TRACE("WAIT upload %llu ms for lat=%d", static_cast(delta), m_runningLatency);
return;
}
}
@@ -162,18 +171,35 @@ namespace MAT_NS_BEGIN {
// Cancel upload if already scheduled.
if (force || delay.count() == 0)
{
- if (!cancelUploadTask())
+ if (!cancelUploadTaskNoWaitLocked())
{
LOG_TRACE("Upload either hasn't been scheduled or already done.");
+ // Cancel can return false when the previous upload task is
+ // currently executing on the worker. If uploadAsync hasn't
+ // yet entered its own LOCKGUARD (m_isUploadScheduled is
+ // still set under the mutex we hold), propagate the
+ // requested latency so the running task picks it up when
+ // it acquires m_scheduledUploadMutex. Otherwise the
+ // running task has already cleared the flag and the
+ // schedule below will queue a fresh task.
+ if (m_isUploadScheduled)
+ {
+ m_runningLatency = latency;
+ }
+ }
+ if (shouldSkipScheduling())
+ {
+ return;
}
}
// Schedule new upload
- if (!m_isUploadScheduled.exchange(true))
+ if (!m_isUploadScheduled)
{
+ m_isUploadScheduled = true;
m_scheduledUploadTime = PAL::getMonotonicTimeMs() + delay.count();
m_runningLatency = latency;
- LOG_TRACE("SCHED upload %d ms for lat=%d", delay.count(), m_runningLatency);
+ LOG_TRACE("SCHED upload %lld ms for lat=%d", delay.count(), m_runningLatency);
m_scheduledUpload = PAL::scheduleTask(&m_taskDispatcher, static_cast(delay.count()), this, &TransmissionPolicyManager::uploadAsync, latency);
}
}
@@ -184,16 +210,15 @@ namespace MAT_NS_BEGIN {
if (guard.isPaused()) {
return;
}
- m_runningLatency = latency;
- m_scheduledUploadTime = std::numeric_limits::max();
-
+ EventLatency requestedLatency = latency;
{
LOCKGUARD(m_scheduledUploadMutex);
+ requestedLatency = m_runningLatency;
+ m_scheduledUploadTime = std::numeric_limits::max();
m_isUploadScheduled = false; // Allow to schedule another uploadAsync
if ((m_isPaused) || (m_scheduledUploadAborted))
{
- LOG_TRACE("Paused or upload aborted: cancel pending upload task.");
- cancelUploadTask(); // If there is a pending upload task, kill it
+ LOG_TRACE("Paused or upload aborted: skip upload.");
return;
}
}
@@ -210,14 +235,14 @@ namespace MAT_NS_BEGIN {
unsigned delayMs = 1000;
LOG_INFO("Bandwidth controller proposed bandwidth %u bytes/sec but minimum accepted is %u, will retry %u ms later",
proposedBandwidthBps, minimumBandwidthBps, delayMs);
- scheduleUpload(delayMs, latency); // reschedule uploadAsync to run again 1000 ms later
+ scheduleUpload(std::chrono::milliseconds{delayMs}, requestedLatency); // reschedule uploadAsync to run again 1000 ms later
return;
}
}
#endif
auto ctx = m_system.createEventsUploadContext();
- ctx->requestedMinLatency = m_runningLatency;
+ ctx->requestedMinLatency = requestedLatency;
addUpload(ctx);
initiateUpload(ctx);
}
@@ -238,7 +263,7 @@ namespace MAT_NS_BEGIN {
// Rescheduling upload
if (nextUpload.count() >= 0)
{
- LOG_TRACE("Scheduling upload in %d ms", nextUpload.count());
+ LOG_TRACE("Scheduling upload in %lld ms", nextUpload.count());
EventLatency proposed = calculateNewPriority();
scheduleUpload(nextUpload, proposed); // reschedule uploadAsync again
}
@@ -284,9 +309,9 @@ namespace MAT_NS_BEGIN {
LOCKGUARD(m_scheduledUploadMutex);
// Prevent execution of all upload tasks
m_scheduledUploadAborted = true;
- // Make sure we wait for completion of the upload scheduling task that may be running
- cancelUploadTask();
}
+ // Make sure we wait for completion of the upload scheduling task that may be running
+ cancelUploadTask();
// Make sure we wait for all active upload callbacks to finish
while (uploadCount() > 0)
@@ -342,7 +367,12 @@ namespace MAT_NS_BEGIN {
}
// Schedule async upload if not scheduled yet
- if (!m_isUploadScheduled || TransmitProfiles::isTimerUpdateRequired())
+ bool isUploadScheduled = false;
+ {
+ LOCKGUARD(m_scheduledUploadMutex);
+ isUploadScheduled = m_isUploadScheduled;
+ }
+ if (!isUploadScheduled || TransmitProfiles::isTimerUpdateRequired())
{
if (updateTimersIfNecessary())
{
@@ -374,7 +404,13 @@ namespace MAT_NS_BEGIN {
return EventLatency_RealTime;
}
- if (m_runningLatency == EventLatency_RealTime)
+ EventLatency runningLatency = EventLatency_RealTime;
+ {
+ LOCKGUARD(m_scheduledUploadMutex);
+ runningLatency = m_runningLatency;
+ }
+
+ if (runningLatency == EventLatency_RealTime)
{
return EventLatency_Normal;
}
@@ -453,16 +489,39 @@ namespace MAT_NS_BEGIN {
return (m_scheduledUploadAborted) ? DefaultTaskCancelTime : std::chrono::milliseconds {};
}
+ bool TransmissionPolicyManager::cancelUploadTaskNoWaitLocked()
+ {
+ bool result = m_scheduledUpload.Cancel(std::chrono::milliseconds {}.count());
+
+ if (result)
+ {
+ m_isUploadScheduled = false;
+ m_scheduledUploadTime = std::numeric_limits::max();
+ }
+ return result;
+ }
+
bool TransmissionPolicyManager::cancelUploadTask()
{
- bool result = m_scheduledUpload.Cancel(getCancelWaitTime().count());
+ auto waitTime = std::chrono::milliseconds{};
+ {
+ LOCKGUARD(m_scheduledUploadMutex);
+ waitTime = getCancelWaitTime();
+ if (waitTime.count() == 0)
+ {
+ return cancelUploadTaskNoWaitLocked();
+ }
+ }
+ bool result = m_scheduledUpload.Cancel(waitTime.count());
// TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for.
// We either need a stronger guarantee here (could impact SDK performance), or a mechanism to
// ensure those tasks are canceled when the log manager is destroyed. Issue 388
if (result)
{
- m_isUploadScheduled.exchange(false);
+ LOCKGUARD(m_scheduledUploadMutex);
+ m_isUploadScheduled = false;
+ m_scheduledUploadTime = std::numeric_limits::max();
}
return result;
}
@@ -476,6 +535,7 @@ namespace MAT_NS_BEGIN {
bool TransmissionPolicyManager::isUploadInProgress() const noexcept
{
// unfinished uploads that haven't processed callbacks or pending upload task
+ LOCKGUARD(m_scheduledUploadMutex);
return (uploadCount() > 0) || m_isUploadScheduled;
}
diff --git a/lib/tpm/TransmissionPolicyManager.hpp b/lib/tpm/TransmissionPolicyManager.hpp
index e1a91ad10..d6c97beb0 100644
--- a/lib/tpm/TransmissionPolicyManager.hpp
+++ b/lib/tpm/TransmissionPolicyManager.hpp
@@ -90,9 +90,9 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1";
DeviceStateHandler m_deviceStateHandler;
std::atomic m_isPaused { true };
- std::atomic m_isUploadScheduled { false };
+ bool m_isUploadScheduled { false };
uint64_t m_scheduledUploadTime { std::numeric_limits::max() };
- std::mutex m_scheduledUploadMutex;
+ mutable std::mutex m_scheduledUploadMutex;
PAL::DeferredCallbackHandle m_scheduledUpload;
bool m_scheduledUploadAborted { false };
@@ -119,6 +119,12 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1";
std::chrono::milliseconds getCancelWaitTime() const noexcept;
+ ///
+ /// Cancels a pending upload task without waiting for a running task to finish.
+ /// The caller must already hold m_scheduledUploadMutex.
+ ///
+ bool cancelUploadTaskNoWaitLocked();
+
///
/// Cancels pending upload task.
///
@@ -160,4 +166,3 @@ constexpr const char* const DefaultBackoffConfig = "E,3000,300000,2,1";
} MAT_NS_END
#endif // TRANSMISSIONPOLICYMANAGER_HPP
-
diff --git a/tests/unittests/HttpResponseDecoderTests.cpp b/tests/unittests/HttpResponseDecoderTests.cpp
index 314cdb513..7d11ae4b8 100644
--- a/tests/unittests/HttpResponseDecoderTests.cpp
+++ b/tests/unittests/HttpResponseDecoderTests.cpp
@@ -88,20 +88,29 @@ TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryServerFailures)
TEST_F(HttpResponseDecoderTests, UnderstandsTemporaryNetworkFailures)
{
auto ctx = createContextWith(HttpResult_LocalFailure, -1, "");
- EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx))
- .WillOnce(Return());
+ EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) {
+ ASSERT_THAT(routedCtx->httpResponse, NotNull());
+ EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_LocalFailure);
+ EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1));
+ }));
decoder.decode(ctx);
ctx = createContextWith(HttpResult_NetworkFailure, -1, "");
- EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx))
- .WillOnce(Return());
+ EXPECT_CALL(*this, resultTemporaryNetworkFailure(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) {
+ ASSERT_THAT(routedCtx->httpResponse, NotNull());
+ EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_NetworkFailure);
+ EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1));
+ }));
decoder.decode(ctx);
}
TEST_F(HttpResponseDecoderTests, SkipsAbortedRequests)
{
auto ctx = createContextWith(HttpResult_Aborted, -1, "");
- EXPECT_CALL(*this, resultRequestAborted(ctx))
- .WillOnce(Return());
+ EXPECT_CALL(*this, resultRequestAborted(ctx)).WillOnce(Invoke([](EventsUploadContextPtr const& routedCtx) {
+ ASSERT_THAT(routedCtx->httpResponse, NotNull());
+ EXPECT_THAT(routedCtx->httpResponse->GetResult(), HttpResult_Aborted);
+ EXPECT_THAT(routedCtx->httpResponse->GetStatusCode(), static_cast(-1));
+ }));
decoder.decode(ctx);
}
diff --git a/tests/unittests/TaskDispatcherCAPITests.cpp b/tests/unittests/TaskDispatcherCAPITests.cpp
index 0867ad046..5c1acdecd 100644
--- a/tests/unittests/TaskDispatcherCAPITests.cpp
+++ b/tests/unittests/TaskDispatcherCAPITests.cpp
@@ -227,3 +227,46 @@ TEST(TaskDispatcherCAPITests, Join)
EXPECT_EQ(wasJoined, true);
}
+namespace
+{
+ // Dispatcher that always drops (and deletes) the task, modeling the
+ // shutdown-drop path where QueueWithResult() reports failure.
+ class DroppingTaskDispatcher : public ITaskDispatcher
+ {
+ public:
+ bool cancelCalled = false;
+ void Join() override {}
+ void Queue(MAT::Task* task) override { delete task; }
+ bool QueueWithResult(MAT::Task* task) override
+ {
+ delete task;
+ return false;
+ }
+ bool Cancel(MAT::Task* /*task*/, uint64_t /*waitTime*/ = 0) override
+ {
+ cancelCalled = true;
+ return false;
+ }
+ };
+
+ struct NoopCallbackTarget
+ {
+ void Callback(int, int) {}
+ };
+}
+
+// When the dispatcher drops the task (for example during shutdown), scheduleTask
+// must return a no-op handle rather than one pointing at the freed task, so the
+// caller never holds a dangling pointer and Cancel() is a safe no-op.
+TEST(TaskDispatcherCAPITests, ScheduleTaskReturnsNoOpHandleWhenTaskDropped)
+{
+ DroppingTaskDispatcher dispatcher;
+ NoopCallbackTarget target;
+
+ auto handle = scheduleTask(&dispatcher, 100 /*delayMs*/, &target, &NoopCallbackTarget::Callback, 1, 2);
+
+ EXPECT_EQ(handle.m_task, nullptr);
+ EXPECT_TRUE(handle.Cancel());
+ EXPECT_FALSE(dispatcher.cancelCalled);
+}
+
diff --git a/tests/unittests/TransmissionPolicyManagerTests.cpp b/tests/unittests/TransmissionPolicyManagerTests.cpp
index 6cbdb99f5..2f4a75ec1 100644
--- a/tests/unittests/TransmissionPolicyManagerTests.cpp
+++ b/tests/unittests/TransmissionPolicyManagerTests.cpp
@@ -11,14 +11,24 @@
#include "tpm/TransmissionPolicyManager.hpp"
#include "TransmitProfiles.hpp"
+#include
+#include
+#include
+#include
+
using namespace testing;
using namespace MAT;
class TransmissionPolicyManager4Test : public TransmissionPolicyManager {
public:
+ TransmissionPolicyManager4Test(ITelemetrySystem& system, ITaskDispatcher& taskDispatcher, IBandwidthController* bandwidthController)
+ : TransmissionPolicyManager(system, taskDispatcher, bandwidthController)
+ {
+ }
+
TransmissionPolicyManager4Test(ITelemetrySystem& system, IBandwidthController* bandwidthController)
- : TransmissionPolicyManager(system, *PAL::getDefaultTaskDispatcher(), bandwidthController)
+ : TransmissionPolicyManager4Test(system, *PAL::getDefaultTaskDispatcher(), bandwidthController)
{
}
@@ -69,6 +79,141 @@ class TransmissionPolicyManager4Test : public TransmissionPolicyManager {
}
};
+class BlockingCancelTaskDispatcher : public ITaskDispatcher
+{
+public:
+ ~BlockingCancelTaskDispatcher() override
+ {
+ Join();
+ }
+
+ void Join() override
+ {
+ std::lock_guard lock(m_tasksMutex);
+ for (auto* task : m_tasks)
+ {
+ delete task;
+ }
+ m_tasks.clear();
+ }
+
+ void Queue(Task* task) override
+ {
+ std::lock_guard lock(m_tasksMutex);
+ m_tasks.push_back(task);
+ }
+
+ bool Cancel(Task* task, uint64_t waitTime = 0) override
+ {
+ UNREFERENCED_PARAMETER(waitTime);
+
+ {
+ std::lock_guard lock(m_tasksMutex);
+ auto it = std::find(m_tasks.begin(), m_tasks.end(), task);
+ if (it == m_tasks.end())
+ {
+ return false;
+ }
+ delete *it;
+ m_tasks.erase(it);
+ }
+
+ {
+ std::lock_guard lock(m_cancelMutex);
+ m_cancelEntered = true;
+ }
+ m_cancelEnteredCv.notify_all();
+
+ std::unique_lock lock(m_cancelMutex);
+ m_cancelReleasedCv.wait(lock, [this]() { return m_cancelReleased; });
+ return true;
+ }
+
+ bool WaitForCancel(const std::chrono::milliseconds timeout)
+ {
+ std::unique_lock lock(m_cancelMutex);
+ return m_cancelEnteredCv.wait_for(lock, timeout, [this]() { return m_cancelEntered; });
+ }
+
+ void ReleaseCancel()
+ {
+ {
+ std::lock_guard lock(m_cancelMutex);
+ m_cancelReleased = true;
+ }
+ m_cancelReleasedCv.notify_all();
+ }
+
+private:
+ std::mutex m_tasksMutex;
+ std::vector m_tasks;
+
+ std::mutex m_cancelMutex;
+ std::condition_variable m_cancelEnteredCv;
+ std::condition_variable m_cancelReleasedCv;
+ bool m_cancelEntered = false;
+ bool m_cancelReleased = false;
+};
+
+class RunningTaskDispatcher : public ITaskDispatcher
+{
+public:
+ ~RunningTaskDispatcher() override
+ {
+ std::lock_guard lock(m_tasksMutex);
+ for (auto* task : m_tasks)
+ {
+ delete task;
+ }
+ m_tasks.clear();
+ }
+
+ void Join() override
+ {
+ std::lock_guard lock(m_tasksMutex);
+ for (auto* task : m_tasks)
+ {
+ delete task;
+ }
+ m_tasks.clear();
+ }
+
+ void Queue(Task* task) override
+ {
+ std::lock_guard lock(m_tasksMutex);
+ m_tasks.push_back(task);
+ }
+
+ bool Cancel(Task* task, uint64_t waitTime = 0) override
+ {
+ UNREFERENCED_PARAMETER(task);
+ UNREFERENCED_PARAMETER(waitTime);
+ // Simulate a task that is currently executing on the worker:
+ // cancellation can never proceed without waiting for the run
+ // to complete, so a no-wait cancel must return false.
+ std::lock_guard lock(m_tasksMutex);
+ m_cancelCount++;
+ return false;
+ }
+
+ size_t QueuedCount() const
+ {
+ std::lock_guard lock(m_tasksMutex);
+ return m_tasks.size();
+ }
+
+ size_t CancelCount() const
+ {
+ std::lock_guard lock(m_tasksMutex);
+ return m_cancelCount;
+ }
+
+private:
+ mutable std::mutex m_tasksMutex;
+ std::vector m_tasks;
+ size_t m_cancelCount = 0;
+};
+
class TransmissionPolicyManagerTests : public StrictMock {
protected:
StrictMock runtimeConfigMock;
@@ -608,6 +753,73 @@ TEST_F(TransmissionPolicyManagerTests, cancelUploadTask_ScheduledUpload_IsUpload
ASSERT_FALSE(tpm.m_isUploadScheduled);
}
+TEST_F(TransmissionPolicyManagerTests, ForceScheduleRetainsImmediateUploadWhenCancelBlocks)
+{
+ BlockingCancelTaskDispatcher dispatcher;
+ TransmissionPolicyManager4Test blockingTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock);
+ blockingTpm.paused(false);
+
+ blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false);
+ auto delayedUploadTime = blockingTpm.m_scheduledUploadTime;
+
+ auto forceSchedule = std::async(std::launch::async, [&blockingTpm]() {
+ blockingTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true);
+ });
+
+ if (!dispatcher.WaitForCancel(std::chrono::milliseconds{ 250 }))
+ {
+ dispatcher.ReleaseCancel();
+ forceSchedule.get();
+ FAIL() << "Timed out waiting for cancel to block";
+ }
+
+ auto delayedSchedule = std::async(std::launch::async, [&blockingTpm]() {
+ blockingTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false);
+ });
+
+ EXPECT_EQ(delayedSchedule.wait_for(std::chrono::milliseconds{ 100 }), std::future_status::timeout);
+
+ dispatcher.ReleaseCancel();
+
+ forceSchedule.get();
+ delayedSchedule.get();
+
+ ASSERT_TRUE(blockingTpm.m_isUploadScheduled);
+ EXPECT_LT(blockingTpm.m_scheduledUploadTime, delayedUploadTime);
+}
+
+TEST_F(TransmissionPolicyManagerTests, ForceScheduleAppliesLatencyWhenRunningCancelFails)
+{
+ RunningTaskDispatcher dispatcher;
+ TransmissionPolicyManager4Test runningTpm(testing::getSystem(), dispatcher, &bandwidthControllerMock);
+ runningTpm.paused(false);
+
+ // Queue an initial upload so m_scheduledUpload has a non-null task and
+ // m_isUploadScheduled is set; the dispatcher's Cancel will fail later
+ // (simulating the "task currently executing on worker" race).
+ runningTpm.scheduleUploadParent(std::chrono::milliseconds{ 1000 }, EventLatency_Normal, false);
+ ASSERT_TRUE(runningTpm.m_isUploadScheduled);
+ ASSERT_EQ(dispatcher.QueuedCount(), 1u);
+
+ auto scheduledTimeBefore = runningTpm.m_scheduledUploadTime;
+ // Reset m_runningLatency so we can observe the force path updating it
+ // (the initial schedule may have bumped it depending on the active
+ // profile's timers).
+ runningTpm.runningLatency(EventLatency_Normal);
+
+ // Force a higher-priority schedule. The dispatcher's no-wait cancel
+ // returns false, so the previous task remains in flight. The fix in
+ // scheduleUpload must propagate the new latency to m_runningLatency
+ // so the running task picks it up under the same mutex.
+ runningTpm.scheduleUploadParent(std::chrono::milliseconds{}, EventLatency_RealTime, true);
+
+ EXPECT_GE(dispatcher.CancelCount(), 1u);
+ EXPECT_EQ(dispatcher.QueuedCount(), 1u);
+ EXPECT_TRUE(runningTpm.m_isUploadScheduled);
+ EXPECT_EQ(runningTpm.m_runningLatency, EventLatency_RealTime);
+ EXPECT_EQ(runningTpm.m_scheduledUploadTime, scheduledTimeBefore);
+}
+
TEST_F(TransmissionPolicyManagerTests, increaseBackoff_EmptyBackoffObject_ReturnZero)
{
tpm.m_backoff = nullptr;