Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 75 additions & 52 deletions src/core/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ typedef struct {
uint32_t worker_id; /* 1-based (0 = main thread) */
} worker_ctx_t;

/* Claim the next ticket in the current window via a bounded CAS. Returns true
* and sets *out_idx when a ticket in [.., task_limit) was claimed; false when
* the window is exhausted. The CAS never advances task_claim past task_limit,
* so the cursor stays EXACT across dispatches (no fetch_add overshoot) — this
* is what lets the next dispatch carve its window off the high-water mark
* without a reset, closing the cross-dispatch claim race. The acquire on
* task_limit pairs with the publishing store-release so a claimer that sees
* the new window also sees the freshly-filled ring slots and `pending`. */
static inline bool pool_claim(ray_pool_t* pool, uint64_t* out_idx) {
uint64_t cur = atomic_load_explicit(&pool->task_claim, memory_order_relaxed);
for (;;) {
uint64_t lim = atomic_load_explicit(&pool->task_limit, memory_order_acquire);
if (cur >= lim) return false;
if (atomic_compare_exchange_weak_explicit(&pool->task_claim, &cur, cur + 1,
memory_order_acq_rel, memory_order_relaxed)) {
*out_idx = cur;
return true;
}
/* CAS failed: `cur` now holds the current value — retry. */
}
}

static void worker_loop(void* arg) {
worker_ctx_t wctx = *(worker_ctx_t*)arg;
ray_sys_free(arg);
Expand All @@ -59,14 +81,9 @@ static void worker_loop(void* arg) {
if (atomic_load_explicit(&pool->shutdown, memory_order_acquire))
break;

/* Claim and execute tasks until ring is drained */
for (;;) {
uint32_t idx = atomic_fetch_add_explicit(&pool->task_tail, 1,
memory_order_acq_rel);
if (idx >= atomic_load_explicit(&pool->task_count,
memory_order_acquire))
break;

/* Claim and execute tasks until the window is drained */
uint64_t idx;
while (pool_claim(pool, &idx)) {
/* Skip execution if query was cancelled */
if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled,
memory_order_relaxed))) {
Expand Down Expand Up @@ -102,8 +119,8 @@ ray_err_t ray_pool_create(ray_pool_t* pool, uint32_t n_workers) {
* valid zero bit pattern on all supported platforms, but C11 requires
* atomic_init for well-defined atomic semantics. */
atomic_init(&pool->shutdown, 0);
atomic_init(&pool->task_tail, 0);
atomic_init(&pool->task_count, 0);
atomic_init(&pool->task_claim, 0);
atomic_init(&pool->task_limit, 0);
atomic_init(&pool->pending, 0);
atomic_init(&pool->cancelled, 0);

Expand All @@ -123,9 +140,8 @@ ray_err_t ray_pool_create(ray_pool_t* pool, uint32_t n_workers) {
pool->tasks = (ray_pool_task_t*)ray_sys_alloc(pool->task_cap * sizeof(ray_pool_task_t));
if (!pool->tasks) return RAY_ERR_OOM;

pool->task_head = 0;
atomic_store_explicit(&pool->task_tail, 0, memory_order_relaxed);
atomic_store_explicit(&pool->task_count, 0, memory_order_relaxed);
atomic_store_explicit(&pool->task_claim, 0, memory_order_relaxed);
atomic_store_explicit(&pool->task_limit, 0, memory_order_relaxed);
atomic_store_explicit(&pool->pending, 0, memory_order_relaxed);

ray_err_t err = ray_sem_init(&pool->work_ready, 0);
Expand Down Expand Up @@ -226,9 +242,11 @@ void ray_pool_dispatch(ray_pool_t* pool, ray_pool_fn fn, void* ctx,
uint32_t n_tasks = (uint32_t)((total_elems + grain - 1) / grain);

/* conc-L6: Ring growth is safe without synchronization because dispatch is
* single-producer: only the main thread (the dispatch caller) writes to
* task_head, tasks[], and task_cap. Workers only read via task_tail after
* the publish fence (task_count store-release). */
* single-producer: only the main thread (the dispatch caller) writes
* tasks[] and task_cap, and only while the prior window is fully claimed
* (no task pending). Workers read tasks[]/task_cap only after acquiring
* the new task_limit (store-release), which orders the growth before any
* claim — so a late worker never reads a stale tasks pointer or task_cap. */
if (n_tasks > pool->task_cap) {
uint32_t new_cap = pool->task_cap;
while (new_cap < n_tasks && new_cap < MAX_RING_CAP) new_cap *= 2;
Expand All @@ -248,23 +266,28 @@ void ray_pool_dispatch(ray_pool_t* pool, ray_pool_fn fn, void* ctx,
grain = (total_elems + n_tasks - 1) / n_tasks;
}

/* Fill task ring */
/* Carve a fresh window [base, base+n_tasks) off the monotonic high-water
* mark. The prior dispatch is fully claimed, so task_claim == task_limit
* == base here; the window's slots never alias a still-live slot. */
uint64_t base = atomic_load_explicit(&pool->task_limit, memory_order_relaxed);

/* Fill task ring for tickets [base, base+n_tasks) */
for (uint32_t i = 0; i < n_tasks; i++) {
int64_t start = (int64_t)i * grain;
int64_t end = start + grain;
if (end > total_elems) end = total_elems;

uint32_t slot = i & (pool->task_cap - 1);
uint32_t slot = (uint32_t)((base + i) & (pool->task_cap - 1));
pool->tasks[slot].fn = fn;
pool->tasks[slot].ctx = ctx;
pool->tasks[slot].start = start;
pool->tasks[slot].end = end;
}

pool->task_head = n_tasks;
atomic_store_explicit(&pool->task_count, n_tasks, memory_order_release);
atomic_store_explicit(&pool->task_tail, 0, memory_order_release);
atomic_store_explicit(&pool->pending, n_tasks, memory_order_release);
/* pending must be visible before the window opens; the task_limit
* store-release publishes the ring fill AND pending to claimers. */
atomic_store_explicit(&pool->pending, n_tasks, memory_order_relaxed);
atomic_store_explicit(&pool->task_limit, base + n_tasks, memory_order_release);

/* Mark parallel region: workers are about to run, cross-heap
* freelist modification is unsafe until spin-wait completes. */
Expand All @@ -279,21 +302,20 @@ void ray_pool_dispatch(ray_pool_t* pool, ray_pool_fn fn, void* ctx,
}

/* Main thread participates as worker 0 */
for (;;) {
uint32_t idx = atomic_fetch_add_explicit(&pool->task_tail, 1,
memory_order_acq_rel);
if (idx >= n_tasks) break;
{
uint64_t idx;
while (pool_claim(pool, &idx)) {
if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled,
memory_order_relaxed))) {
atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel);
continue;
}

ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)];
t->fn(t->ctx, 0, t->start, t->end);

if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled,
memory_order_relaxed))) {
atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel);
continue;
}

ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)];
t->fn(t->ctx, 0, t->start, t->end);

atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel);
}

/* Spin-wait for workers to finish remaining tasks.
Expand Down Expand Up @@ -346,19 +368,21 @@ void ray_pool_dispatch_n(ray_pool_t* pool, ray_pool_fn fn, void* ctx,
/* Clamp n_tasks to task_cap to prevent ring overflow */
if (n_tasks > pool->task_cap) n_tasks = pool->task_cap;

/* Fill task ring: one task per partition */
/* Carve a fresh window [base, base+n_tasks) off the monotonic high-water
* mark (the prior dispatch is fully claimed, so base == task_claim). */
uint64_t base = atomic_load_explicit(&pool->task_limit, memory_order_relaxed);

/* Fill task ring: one task per partition, tickets [base, base+n_tasks) */
for (uint32_t i = 0; i < n_tasks; i++) {
uint32_t slot = i & (pool->task_cap - 1);
uint32_t slot = (uint32_t)((base + i) & (pool->task_cap - 1));
pool->tasks[slot].fn = fn;
pool->tasks[slot].ctx = ctx;
pool->tasks[slot].start = (int64_t)i;
pool->tasks[slot].end = (int64_t)i + 1;
}

pool->task_head = n_tasks;
atomic_store_explicit(&pool->task_count, n_tasks, memory_order_release);
atomic_store_explicit(&pool->task_tail, 0, memory_order_release);
atomic_store_explicit(&pool->pending, n_tasks, memory_order_release);
atomic_store_explicit(&pool->pending, n_tasks, memory_order_relaxed);
atomic_store_explicit(&pool->task_limit, base + n_tasks, memory_order_release);

atomic_store_explicit(&ray_parallel_flag, 1, memory_order_release);
ray_rc_sync = true;
Expand All @@ -369,21 +393,20 @@ void ray_pool_dispatch_n(ray_pool_t* pool, ray_pool_fn fn, void* ctx,
}

/* Main thread participates as worker 0 */
for (;;) {
uint32_t idx = atomic_fetch_add_explicit(&pool->task_tail, 1,
memory_order_acq_rel);
if (idx >= n_tasks) break;
{
uint64_t idx;
while (pool_claim(pool, &idx)) {
if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled,
memory_order_relaxed))) {
atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel);
continue;
}

ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)];
t->fn(t->ctx, 0, t->start, t->end);

if (RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled,
memory_order_relaxed))) {
atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel);
continue;
}

ray_pool_task_t* t = &pool->tasks[idx & (pool->task_cap - 1)];
t->fn(t->ctx, 0, t->start, t->end);

atomic_fetch_sub_explicit(&pool->pending, 1, memory_order_acq_rel);
}

/* Spin-wait for workers to finish remaining tasks */
Expand Down
17 changes: 13 additions & 4 deletions src/core/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,21 @@ struct ray_pool {
uint32_t n_workers; /* number of background threads (nproc - 1) */
_Atomic(uint32_t) shutdown;

/* SPMC task ring (single producer = main, multi consumer = workers + main) */
/* SPMC task ring (single producer = main, multi consumer = workers + main).
*
* Claiming uses two MONOTONIC 64-bit cursors that are never reset, which
* eliminates the cross-dispatch reset race: a worker that wakes late on a
* surplus semaphore signal can no longer claim a half-republished slot,
* because there is no republish — each dispatch carves a fresh window
* [base, base+n) off the high-water mark. A ticket maps to ring slot
* (ticket & (task_cap-1)); consecutive dispatches' windows never alias a
* live slot because the prior window is fully claimed before the next
* publishes. Workers claim via a bounded CAS (never overshoot task_limit)
* so the cursor stays exact across dispatches; 64-bit so it never wraps. */
ray_pool_task_t* tasks; /* ring buffer [task_cap] */
uint32_t task_cap; /* power of 2 */
uint32_t task_head; /* next to write (main only, no atomic needed) */
_Atomic(uint32_t) task_tail; /* next to claim (workers, atomic_fetch_add) */
_Atomic(uint32_t) task_count; /* total tasks submitted this dispatch */
_Atomic(uint64_t) task_claim; /* next ticket to claim (bounded CAS) */
_Atomic(uint64_t) task_limit; /* published end of current window */

/* Barrier */
_Atomic(uint32_t) pending; /* decremented by each task completion */
Expand Down
15 changes: 15 additions & 0 deletions test/rfl/integration/joins.rfl
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,18 @@
(set Lgf (table [ID Time Price] (list (.attr.set 'parted [1 1 2 2]) [10:00:03.000 10:00:01.000 10:00:01.000 10:00:02.000] [1.0 2.0 3.0 4.0])))
(set Rgf (table [ID Time Bid] (list (.attr.set 'parted [1 1 2 2]) [10:00:00.000 10:00:02.000 10:00:00.000 10:00:03.000] [9.0 9.5 8.0 8.5])))
(asof-join [ID Time] Lgf Rgf) -- (asof-join [ID Time] Lpf Rpf)

;; ── Large radix-partitioned join (>65536 rows) under back-to-back dispatch ──
;; The right side (70000 rows) triggers the parallel radix scatter, which runs
;; two consecutive pool dispatches (histogram then scatter). A cross-dispatch
;; task-ring race in the pool once let a late worker claim a half-republished
;; slot, double-running/skipping a scatter task and dropping one partition
;; entry — so the join intermittently lost a row (count 7 vs 8). Repeating the
;; join keeps that dispatch path under pressure; the fixed pool must return the
;; exact same count every iteration.
(set RJbig (table [id val2] (list (til 70000) (til 70000))))
(set LJsmall (table [id val] (list [0 100 500 1000 5000 10000 50000 69999] [1 2 3 4 5 6 7 8])))
(count (inner-join [id] LJsmall RJbig)) -- 8
(sum (at (inner-join [id] LJsmall RJbig) 'val)) -- 36
;; 200 back-to-back radix joins — a single dropped row anywhere shows as < 1600.
(sum (map (fn [_] (count (inner-join [id] LJsmall RJbig))) (til 200))) -- 1600
Loading