Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ a re-queue through the scheduler.
* *Single-threaded contexts*:
`unassisted_budget` caps the budget when only one thread is
running the event loop, preserving fairness.
* *Disable the fast path entirely*: set all three options to 0 to
force a re-queue on every completion (useful as a baseline or
when a workload is dominated by cross-thread work-stealing).

[NOTE]
====
When `io_context` is constructed with `concurrency_hint > 1` and all
three budget fields are at their defaults `(2, 16, 4)`, the
constructor overrides them to `(0, 0, 0)`. Multi-thread workloads
benefit from cross-thread work-stealing, which "post-everything"
mode enables. Setting any budget field to a non-default value
disables the override.
====

=== IOCP Timeout (`gqcs_timeout_ms`)

Expand Down
32 changes: 29 additions & 3 deletions include/boost/corosio/io_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ struct io_context_options
After a posted handler executes, the reactor grants this
many speculative inline completions before forcing a
re-queue. Applies to reactor backends only.

@note Constructing an `io_context` with `concurrency_hint > 1`
and all three budget fields at their defaults overrides
them to disable inline completion (post-everything mode),
since multi-thread workloads benefit from cross-thread
work-stealing. Setting any budget field to a non-default
value disables the override.
*/
unsigned inline_budget_initial = 2;

Expand Down Expand Up @@ -112,6 +119,11 @@ struct io_context_options
- DNS resolution returns `operation_not_supported`.
- POSIX file I/O returns `operation_not_supported`.
- Signal sets should not be shared across contexts.

@note Constructing an `io_context` with `concurrency_hint == 1`
automatically enables single-threaded mode regardless of
this field's value, matching asio's convention. To opt out,
pass `concurrency_hint > 1`.
*/
bool single_threaded = false;
};
Expand Down Expand Up @@ -158,7 +170,12 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context
void apply_options_pre_(io_context_options const& opts);

/// Apply runtime tuning to the scheduler (after construct).
void apply_options_post_(io_context_options const& opts);
void apply_options_post_(
io_context_options const& opts,
unsigned concurrency_hint);

/// Switch the scheduler to single-threaded (lockless) mode.
void configure_single_threaded_();

protected:
detail::timer_service* timer_svc_ = nullptr;
Expand All @@ -168,7 +185,14 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context
/** The executor type for this context. */
class executor_type;

/** Construct with default concurrency and platform backend. */
/** Construct with default concurrency and platform backend.

Uses `std::thread::hardware_concurrency()` clamped to a minimum
of 2 as the concurrency hint, so the default constructor never
silently engages single-threaded mode (see
@ref io_context_options::single_threaded). Pass an explicit
`concurrency_hint == 1` to opt into single-threaded mode.
*/
io_context();

/** Construct with a concurrency hint and platform backend.
Expand Down Expand Up @@ -206,6 +230,8 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context
{
(void)backend;
sched_ = &Backend::construct(*this, concurrency_hint);
if (concurrency_hint == 1)
configure_single_threaded_();
}

/** Construct with an explicit backend tag and runtime options.
Expand All @@ -229,7 +255,7 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context
(void)backend;
apply_options_pre_(opts);
sched_ = &Backend::construct(*this, concurrency_hint);
apply_options_post_(opts);
apply_options_post_(opts, concurrency_hint);
}

~io_context();
Expand Down
16 changes: 11 additions & 5 deletions include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,9 @@ reactor_scheduler::configure_reactor(
max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
throw std::out_of_range(
"max_events_per_poll must be in [1, INT_MAX]");
if (budget_max < 1 ||
budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
throw std::out_of_range(
"inline_budget_max must be in [1, INT_MAX]");
"inline_budget_max must be in [0, INT_MAX]");

// Clamp initial and unassisted to budget_max.
if (budget_init > budget_max)
Expand All @@ -433,6 +432,10 @@ reactor_scheduler::configure_reactor(
inline void
reactor_scheduler::reset_inline_budget() const noexcept
{
// When budget is disabled (max==0), all paths below would no-op
// (inline_budget stays 0). Skip the TLS lookup entirely.
if (inline_budget_max_ == 0)
return;
if (auto* ctx = reactor_find_context(this))
{
// Cap when no other thread absorbed queued work
Expand All @@ -444,10 +447,11 @@ reactor_scheduler::reset_inline_budget() const noexcept
static_cast<int>(unassisted_budget_);
return;
}
// Ramp up when previous cycle fully consumed budget
// Ramp up when previous cycle fully consumed budget.
// max(1, ...) ensures the doubling escapes zero.
if (ctx->inline_budget == 0)
ctx->inline_budget_max = (std::min)(
ctx->inline_budget_max * 2,
(std::max)(1, ctx->inline_budget_max) * 2,
static_cast<int>(inline_budget_max_));
else if (ctx->inline_budget < ctx->inline_budget_max)
ctx->inline_budget_max =
Expand All @@ -459,6 +463,8 @@ reactor_scheduler::reset_inline_budget() const noexcept
inline bool
reactor_scheduler::try_consume_inline_budget() const noexcept
{
if (inline_budget_max_ == 0)
return false;
if (auto* ctx = reactor_find_context(this))
{
if (ctx->inline_budget > 0)
Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/accept_churn_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ bench_sequential_churn_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
acceptor_type acc(ioc);
acc.open();
acc.set_option(corosio::native_socket_option::reuse_address(true));
Expand Down Expand Up @@ -398,7 +398,7 @@ bench_burst_churn_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
acceptor_type acc(ioc);
acc.open();
acc.set_option(corosio::native_socket_option::reuse_address(true));
Expand Down
6 changes: 3 additions & 3 deletions perf/bench/corosio/fan_out_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ bench_fork_join_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);

std::vector<socket_type> clients;
std::vector<socket_type> servers;
Expand Down Expand Up @@ -414,7 +414,7 @@ bench_nested_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);

std::vector<socket_type> clients;
std::vector<socket_type> servers;
Expand Down Expand Up @@ -509,7 +509,7 @@ bench_concurrent_parents_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);

std::vector<socket_type> clients;
std::vector<socket_type> servers;
Expand Down
2 changes: 1 addition & 1 deletion perf/bench/corosio/http_server_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ bench_single_connection_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [client, server] = corosio::test::make_socket_pair<
socket_type, corosio::native_tcp_acceptor<Backend>>(ioc);

Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/io_context_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ bench_single_threaded_lockless(bench::state& state)
corosio::io_context_options opts;
opts.single_threaded = true;

corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto ex = ioc.get_executor();
int64_t counter = 0;
int constexpr batch_size = 1000;
Expand Down Expand Up @@ -299,7 +299,7 @@ bench_interleaved_lockless(bench::state& state)

int handlers_per_iteration = 100;

corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto ex = ioc.get_executor();
int64_t counter = 0;

Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/local_socket_latency_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ bench_unix_pingpong_latency_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [client, server] = corosio::make_local_stream_pair(ioc);

capy::run_async(ioc.get_executor())(
Expand Down Expand Up @@ -185,7 +185,7 @@ bench_unix_concurrent_latency_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);

std::vector<corosio::local_stream_socket> clients;
std::vector<corosio::local_stream_socket> servers;
Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/local_socket_throughput_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ bench_unix_throughput_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [writer, reader] = corosio::make_local_stream_pair(ioc);

std::vector<char> write_buf(chunk_size, 'x');
Expand Down Expand Up @@ -243,7 +243,7 @@ bench_unix_bidirectional_throughput_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [sock1, sock2] = corosio::make_local_stream_pair(ioc);

std::vector<char> buf1(chunk_size, 'a');
Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/socket_latency_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ bench_pingpong_latency_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [client, server] = corosio::test::make_socket_pair<
socket_type, corosio::native_tcp_acceptor<Backend>>(ioc);

Expand Down Expand Up @@ -204,7 +204,7 @@ bench_concurrent_latency_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);

std::vector<socket_type> clients;
std::vector<socket_type> servers;
Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/socket_throughput_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ bench_throughput_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [writer, reader] = corosio::test::make_socket_pair<
socket_type, corosio::native_tcp_acceptor<Backend>>(ioc);

Expand Down Expand Up @@ -321,7 +321,7 @@ bench_bidirectional_throughput_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
auto [sock1, sock2] = corosio::test::make_socket_pair<
socket_type, corosio::native_tcp_acceptor<Backend>>(ioc);

Expand Down
4 changes: 2 additions & 2 deletions perf/bench/corosio/timer_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ bench_schedule_cancel_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
int64_t counter = 0;
int constexpr batch_size = 1000;

Expand Down Expand Up @@ -148,7 +148,7 @@ bench_fire_rate_lockless(bench::state& state)

corosio::io_context_options opts;
opts.single_threaded = true;
corosio::native_io_context<Backend> ioc(opts);
corosio::native_io_context<Backend> ioc(opts, 1);
std::atomic<bool> running{true};
int64_t counter = 0;

Expand Down
Loading
Loading