diff --git a/src/core/pool.c b/src/core/pool.c index cb622770..427eff59 100644 --- a/src/core/pool.c +++ b/src/core/pool.c @@ -43,6 +43,28 @@ typedef struct { uint32_t worker_id; /* 1-based (0 = main thread) */ } worker_ctx_t; +/* Claim the next ticket in the current window via a bounded CAS. Returns true + * and sets *out_idx when a ticket in [.., task_limit) was claimed; false when + * the window is exhausted. The CAS never advances task_claim past task_limit, + * so the cursor stays EXACT across dispatches (no fetch_add overshoot) — this + * is what lets the next dispatch carve its window off the high-water mark + * without a reset, closing the cross-dispatch claim race. The acquire on + * task_limit pairs with the publishing store-release so a claimer that sees + * the new window also sees the freshly-filled ring slots and `pending`. */ +static inline bool pool_claim(ray_pool_t* pool, uint64_t* out_idx) { + uint64_t cur = atomic_load_explicit(&pool->task_claim, memory_order_relaxed); + for (;;) { + uint64_t lim = atomic_load_explicit(&pool->task_limit, memory_order_acquire); + if (cur >= lim) return false; + if (atomic_compare_exchange_weak_explicit(&pool->task_claim, &cur, cur + 1, + memory_order_acq_rel, memory_order_relaxed)) { + *out_idx = cur; + return true; + } + /* CAS failed: `cur` now holds the current value — retry. */ + } +} + static void worker_loop(void* arg) { worker_ctx_t wctx = *(worker_ctx_t*)arg; ray_sys_free(arg); @@ -59,14 +81,9 @@ static void worker_loop(void* arg) { if (atomic_load_explicit(&pool->shutdown, memory_order_acquire)) break; - /* Claim and execute tasks until ring is drained */ - for (;;) { - uint32_t idx = atomic_fetch_add_explicit(&pool->task_tail, 1, - memory_order_acq_rel); - if (idx >= atomic_load_explicit(&pool->task_count, - memory_order_acquire)) - break; - + /* Claim and execute tasks until the window is drained */ + uint64_t idx; + while (pool_claim(pool, &idx)) { /* Skip execution if query was cancelled */ if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled, memory_order_relaxed))) { @@ -102,8 +119,8 @@ ray_err_t ray_pool_create(ray_pool_t* pool, uint32_t n_workers) { * valid zero bit pattern on all supported platforms, but C11 requires * atomic_init for well-defined atomic semantics. */ atomic_init(&pool->shutdown, 0); - atomic_init(&pool->task_tail, 0); - atomic_init(&pool->task_count, 0); + atomic_init(&pool->task_claim, 0); + atomic_init(&pool->task_limit, 0); atomic_init(&pool->pending, 0); atomic_init(&pool->cancelled, 0); @@ -123,9 +140,8 @@ ray_err_t ray_pool_create(ray_pool_t* pool, uint32_t n_workers) { pool->tasks = (ray_pool_task_t*)ray_sys_alloc(pool->task_cap * sizeof(ray_pool_task_t)); if (!pool->tasks) return RAY_ERR_OOM; - pool->task_head = 0; - atomic_store_explicit(&pool->task_tail, 0, memory_order_relaxed); - atomic_store_explicit(&pool->task_count, 0, memory_order_relaxed); + atomic_store_explicit(&pool->task_claim, 0, memory_order_relaxed); + atomic_store_explicit(&pool->task_limit, 0, memory_order_relaxed); atomic_store_explicit(&pool->pending, 0, memory_order_relaxed); ray_err_t err = ray_sem_init(&pool->work_ready, 0); @@ -226,9 +242,11 @@ void ray_pool_dispatch(ray_pool_t* pool, ray_pool_fn fn, void* ctx, uint32_t n_tasks = (uint32_t)((total_elems + grain - 1) / grain); /* conc-L6: Ring growth is safe without synchronization because dispatch is - * single-producer: only the main thread (the dispatch caller) writes to - * task_head, tasks[], and task_cap. Workers only read via task_tail after - * the publish fence (task_count store-release). */ + * single-producer: only the main thread (the dispatch caller) writes + * tasks[] and task_cap, and only while the prior window is fully claimed + * (no task pending). Workers read tasks[]/task_cap only after acquiring + * the new task_limit (store-release), which orders the growth before any + * claim — so a late worker never reads a stale tasks pointer or task_cap. */ if (n_tasks > pool->task_cap) { uint32_t new_cap = pool->task_cap; while (new_cap < n_tasks && new_cap < MAX_RING_CAP) new_cap *= 2; @@ -248,23 +266,28 @@ void ray_pool_dispatch(ray_pool_t* pool, ray_pool_fn fn, void* ctx, grain = (total_elems + n_tasks - 1) / n_tasks; } - /* Fill task ring */ + /* Carve a fresh window [base, base+n_tasks) off the monotonic high-water + * mark. The prior dispatch is fully claimed, so task_claim == task_limit + * == base here; the window's slots never alias a still-live slot. */ + uint64_t base = atomic_load_explicit(&pool->task_limit, memory_order_relaxed); + + /* Fill task ring for tickets [base, base+n_tasks) */ for (uint32_t i = 0; i < n_tasks; i++) { int64_t start = (int64_t)i * grain; int64_t end = start + grain; if (end > total_elems) end = total_elems; - uint32_t slot = i & (pool->task_cap - 1); + uint32_t slot = (uint32_t)((base + i) & (pool->task_cap - 1)); pool->tasks[slot].fn = fn; pool->tasks[slot].ctx = ctx; pool->tasks[slot].start = start; pool->tasks[slot].end = end; } - pool->task_head = n_tasks; - atomic_store_explicit(&pool->task_count, n_tasks, memory_order_release); - atomic_store_explicit(&pool->task_tail, 0, memory_order_release); - atomic_store_explicit(&pool->pending, n_tasks, memory_order_release); + /* pending must be visible before the window opens; the task_limit + * store-release publishes the ring fill AND pending to claimers. */ + atomic_store_explicit(&pool->pending, n_tasks, memory_order_relaxed); + atomic_store_explicit(&pool->task_limit, base + n_tasks, memory_order_release); /* Mark parallel region: workers are about to run, cross-heap * freelist modification is unsafe until spin-wait completes. */ @@ -279,21 +302,20 @@ void ray_pool_dispatch(ray_pool_t* pool, ray_pool_fn fn, void* ctx, } /* Main thread participates as worker 0 */ - for (;;) { - uint32_t idx = atomic_fetch_add_explicit(&pool->task_tail, 1, - memory_order_acq_rel); - if (idx >= n_tasks) break; + { + uint64_t idx; + while (pool_claim(pool, &idx)) { + if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled, + memory_order_relaxed))) { + atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel); + continue; + } + + ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)]; + t->fn(t->ctx, 0, t->start, t->end); - if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled, - memory_order_relaxed))) { atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel); - continue; } - - ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)]; - t->fn(t->ctx, 0, t->start, t->end); - - atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel); } /* Spin-wait for workers to finish remaining tasks. @@ -346,19 +368,21 @@ void ray_pool_dispatch_n(ray_pool_t* pool, ray_pool_fn fn, void* ctx, /* Clamp n_tasks to task_cap to prevent ring overflow */ if (n_tasks > pool->task_cap) n_tasks = pool->task_cap; - /* Fill task ring: one task per partition */ + /* Carve a fresh window [base, base+n_tasks) off the monotonic high-water + * mark (the prior dispatch is fully claimed, so base == task_claim). */ + uint64_t base = atomic_load_explicit(&pool->task_limit, memory_order_relaxed); + + /* Fill task ring: one task per partition, tickets [base, base+n_tasks) */ for (uint32_t i = 0; i < n_tasks; i++) { - uint32_t slot = i & (pool->task_cap - 1); + uint32_t slot = (uint32_t)((base + i) & (pool->task_cap - 1)); pool->tasks[slot].fn = fn; pool->tasks[slot].ctx = ctx; pool->tasks[slot].start = (int64_t)i; pool->tasks[slot].end = (int64_t)i + 1; } - pool->task_head = n_tasks; - atomic_store_explicit(&pool->task_count, n_tasks, memory_order_release); - atomic_store_explicit(&pool->task_tail, 0, memory_order_release); - atomic_store_explicit(&pool->pending, n_tasks, memory_order_release); + atomic_store_explicit(&pool->pending, n_tasks, memory_order_relaxed); + atomic_store_explicit(&pool->task_limit, base + n_tasks, memory_order_release); atomic_store_explicit(&ray_parallel_flag, 1, memory_order_release); ray_rc_sync = true; @@ -369,21 +393,20 @@ void ray_pool_dispatch_n(ray_pool_t* pool, ray_pool_fn fn, void* ctx, } /* Main thread participates as worker 0 */ - for (;;) { - uint32_t idx = atomic_fetch_add_explicit(&pool->task_tail, 1, - memory_order_acq_rel); - if (idx >= n_tasks) break; + { + uint64_t idx; + while (pool_claim(pool, &idx)) { + if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled, + memory_order_relaxed))) { + atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel); + continue; + } + + ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)]; + t->fn(t->ctx, 0, t->start, t->end); - if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled, - memory_order_relaxed))) { atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel); - continue; } - - ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)]; - t->fn(t->ctx, 0, t->start, t->end); - - atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel); } /* Spin-wait for workers to finish remaining tasks */ diff --git a/src/core/pool.h b/src/core/pool.h index 32527554..6d860ddf 100644 --- a/src/core/pool.h +++ b/src/core/pool.h @@ -52,12 +52,21 @@ struct ray_pool { uint32_t n_workers; /* number of background threads (nproc - 1) */ _Atomic(uint32_t) shutdown; - /* SPMC task ring (single producer = main, multi consumer = workers + main) */ + /* SPMC task ring (single producer = main, multi consumer = workers + main). + * + * Claiming uses two MONOTONIC 64-bit cursors that are never reset, which + * eliminates the cross-dispatch reset race: a worker that wakes late on a + * surplus semaphore signal can no longer claim a half-republished slot, + * because there is no republish — each dispatch carves a fresh window + * [base, base+n) off the high-water mark. A ticket maps to ring slot + * (ticket & (task_cap-1)); consecutive dispatches' windows never alias a + * live slot because the prior window is fully claimed before the next + * publishes. Workers claim via a bounded CAS (never overshoot task_limit) + * so the cursor stays exact across dispatches; 64-bit so it never wraps. */ ray_pool_task_t* tasks; /* ring buffer [task_cap] */ uint32_t task_cap; /* power of 2 */ - uint32_t task_head; /* next to write (main only, no atomic needed) */ - _Atomic(uint32_t) task_tail; /* next to claim (workers, atomic_fetch_add) */ - _Atomic(uint32_t) task_count; /* total tasks submitted this dispatch */ + _Atomic(uint64_t) task_claim; /* next ticket to claim (bounded CAS) */ + _Atomic(uint64_t) task_limit; /* published end of current window */ /* Barrier */ _Atomic(uint32_t) pending; /* decremented by each task completion */ diff --git a/test/rfl/integration/joins.rfl b/test/rfl/integration/joins.rfl index 6f158bb2..7f03ac91 100644 --- a/test/rfl/integration/joins.rfl +++ b/test/rfl/integration/joins.rfl @@ -275,3 +275,18 @@ (set Lgf (table [ID Time Price] (list (.attr.set 'parted [1 1 2 2]) [10:00:03.000 10:00:01.000 10:00:01.000 10:00:02.000] [1.0 2.0 3.0 4.0]))) (set Rgf (table [ID Time Bid] (list (.attr.set 'parted [1 1 2 2]) [10:00:00.000 10:00:02.000 10:00:00.000 10:00:03.000] [9.0 9.5 8.0 8.5]))) (asof-join [ID Time] Lgf Rgf) -- (asof-join [ID Time] Lpf Rpf) + +;; ── Large radix-partitioned join (>65536 rows) under back-to-back dispatch ── +;; The right side (70000 rows) triggers the parallel radix scatter, which runs +;; two consecutive pool dispatches (histogram then scatter). A cross-dispatch +;; task-ring race in the pool once let a late worker claim a half-republished +;; slot, double-running/skipping a scatter task and dropping one partition +;; entry — so the join intermittently lost a row (count 7 vs 8). Repeating the +;; join keeps that dispatch path under pressure; the fixed pool must return the +;; exact same count every iteration. +(set RJbig (table [id val2] (list (til 70000) (til 70000)))) +(set LJsmall (table [id val] (list [0 100 500 1000 5000 10000 50000 69999] [1 2 3 4 5 6 7 8]))) +(count (inner-join [id] LJsmall RJbig)) -- 8 +(sum (at (inner-join [id] LJsmall RJbig) 'val)) -- 36 +;; 200 back-to-back radix joins — a single dropped row anywhere shows as < 1600. +(sum (map (fn [_] (count (inner-join [id] LJsmall RJbig))) (til 200))) -- 1600