diff --git a/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc b/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc index bf6708f2..19c95c71 100644 --- a/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc +++ b/doc/modules/ROOT/pages/4.guide/4c2.configuration.adoc @@ -110,6 +110,19 @@ a re-queue through the scheduler. * *Single-threaded contexts*: `unassisted_budget` caps the budget when only one thread is running the event loop, preserving fairness. +* *Disable the fast path entirely*: set all three options to 0 to + force a re-queue on every completion (useful as a baseline or + when a workload is dominated by cross-thread work-stealing). + +[NOTE] +==== +When `io_context` is constructed with `concurrency_hint > 1` and all +three budget fields are at their defaults `(2, 16, 4)`, the +constructor overrides them to `(0, 0, 0)`. Multi-thread workloads +benefit from cross-thread work-stealing, which "post-everything" +mode enables. Setting any budget field to a non-default value +disables the override. +==== === IOCP Timeout (`gqcs_timeout_ms`) diff --git a/include/boost/corosio/io_context.hpp b/include/boost/corosio/io_context.hpp index 9671c1e9..2c3d79aa 100644 --- a/include/boost/corosio/io_context.hpp +++ b/include/boost/corosio/io_context.hpp @@ -64,6 +64,13 @@ struct io_context_options After a posted handler executes, the reactor grants this many speculative inline completions before forcing a re-queue. Applies to reactor backends only. + + @note Constructing an `io_context` with `concurrency_hint > 1` + and all three budget fields at their defaults overrides + them to disable inline completion (post-everything mode), + since multi-thread workloads benefit from cross-thread + work-stealing. Setting any budget field to a non-default + value disables the override. */ unsigned inline_budget_initial = 2; @@ -112,6 +119,11 @@ struct io_context_options - DNS resolution returns `operation_not_supported`. - POSIX file I/O returns `operation_not_supported`. - Signal sets should not be shared across contexts. + + @note Constructing an `io_context` with `concurrency_hint == 1` + automatically enables single-threaded mode regardless of + this field's value, matching asio's convention. To opt out, + pass `concurrency_hint > 1`. */ bool single_threaded = false; }; @@ -158,7 +170,12 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context void apply_options_pre_(io_context_options const& opts); /// Apply runtime tuning to the scheduler (after construct). - void apply_options_post_(io_context_options const& opts); + void apply_options_post_( + io_context_options const& opts, + unsigned concurrency_hint); + + /// Switch the scheduler to single-threaded (lockless) mode. + void configure_single_threaded_(); protected: detail::timer_service* timer_svc_ = nullptr; @@ -168,7 +185,14 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context /** The executor type for this context. */ class executor_type; - /** Construct with default concurrency and platform backend. */ + /** Construct with default concurrency and platform backend. + + Uses `std::thread::hardware_concurrency()` clamped to a minimum + of 2 as the concurrency hint, so the default constructor never + silently engages single-threaded mode (see + @ref io_context_options::single_threaded). Pass an explicit + `concurrency_hint == 1` to opt into single-threaded mode. + */ io_context(); /** Construct with a concurrency hint and platform backend. @@ -206,6 +230,8 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context { (void)backend; sched_ = &Backend::construct(*this, concurrency_hint); + if (concurrency_hint == 1) + configure_single_threaded_(); } /** Construct with an explicit backend tag and runtime options. @@ -229,7 +255,7 @@ class BOOST_COROSIO_DECL io_context : public capy::execution_context (void)backend; apply_options_pre_(opts); sched_ = &Backend::construct(*this, concurrency_hint); - apply_options_post_(opts); + apply_options_post_(opts, concurrency_hint); } ~io_context(); diff --git a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp index 967630bd..d281af5d 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp @@ -413,10 +413,9 @@ reactor_scheduler::configure_reactor( max_events > static_cast(std::numeric_limits::max())) throw std::out_of_range( "max_events_per_poll must be in [1, INT_MAX]"); - if (budget_max < 1 || - budget_max > static_cast(std::numeric_limits::max())) + if (budget_max > static_cast(std::numeric_limits::max())) throw std::out_of_range( - "inline_budget_max must be in [1, INT_MAX]"); + "inline_budget_max must be in [0, INT_MAX]"); // Clamp initial and unassisted to budget_max. if (budget_init > budget_max) @@ -433,6 +432,10 @@ reactor_scheduler::configure_reactor( inline void reactor_scheduler::reset_inline_budget() const noexcept { + // When budget is disabled (max==0), all paths below would no-op + // (inline_budget stays 0). Skip the TLS lookup entirely. + if (inline_budget_max_ == 0) + return; if (auto* ctx = reactor_find_context(this)) { // Cap when no other thread absorbed queued work @@ -444,10 +447,11 @@ reactor_scheduler::reset_inline_budget() const noexcept static_cast(unassisted_budget_); return; } - // Ramp up when previous cycle fully consumed budget + // Ramp up when previous cycle fully consumed budget. + // max(1, ...) ensures the doubling escapes zero. if (ctx->inline_budget == 0) ctx->inline_budget_max = (std::min)( - ctx->inline_budget_max * 2, + (std::max)(1, ctx->inline_budget_max) * 2, static_cast(inline_budget_max_)); else if (ctx->inline_budget < ctx->inline_budget_max) ctx->inline_budget_max = @@ -459,6 +463,8 @@ reactor_scheduler::reset_inline_budget() const noexcept inline bool reactor_scheduler::try_consume_inline_budget() const noexcept { + if (inline_budget_max_ == 0) + return false; if (auto* ctx = reactor_find_context(this)) { if (ctx->inline_budget > 0) diff --git a/perf/bench/corosio/accept_churn_bench.cpp b/perf/bench/corosio/accept_churn_bench.cpp index a43efd46..13003b46 100644 --- a/perf/bench/corosio/accept_churn_bench.cpp +++ b/perf/bench/corosio/accept_churn_bench.cpp @@ -133,7 +133,7 @@ bench_sequential_churn_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); acceptor_type acc(ioc); acc.open(); acc.set_option(corosio::native_socket_option::reuse_address(true)); @@ -398,7 +398,7 @@ bench_burst_churn_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); acceptor_type acc(ioc); acc.open(); acc.set_option(corosio::native_socket_option::reuse_address(true)); diff --git a/perf/bench/corosio/fan_out_bench.cpp b/perf/bench/corosio/fan_out_bench.cpp index a18c754b..fc189909 100644 --- a/perf/bench/corosio/fan_out_bench.cpp +++ b/perf/bench/corosio/fan_out_bench.cpp @@ -337,7 +337,7 @@ bench_fork_join_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); std::vector clients; std::vector servers; @@ -414,7 +414,7 @@ bench_nested_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); std::vector clients; std::vector servers; @@ -509,7 +509,7 @@ bench_concurrent_parents_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); std::vector clients; std::vector servers; diff --git a/perf/bench/corosio/http_server_bench.cpp b/perf/bench/corosio/http_server_bench.cpp index a7db9050..bca9b13c 100644 --- a/perf/bench/corosio/http_server_bench.cpp +++ b/perf/bench/corosio/http_server_bench.cpp @@ -156,7 +156,7 @@ bench_single_connection_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [client, server] = corosio::test::make_socket_pair< socket_type, corosio::native_tcp_acceptor>(ioc); diff --git a/perf/bench/corosio/io_context_bench.cpp b/perf/bench/corosio/io_context_bench.cpp index 1a39cf4d..c99d4dab 100644 --- a/perf/bench/corosio/io_context_bench.cpp +++ b/perf/bench/corosio/io_context_bench.cpp @@ -265,7 +265,7 @@ bench_single_threaded_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto ex = ioc.get_executor(); int64_t counter = 0; int constexpr batch_size = 1000; @@ -299,7 +299,7 @@ bench_interleaved_lockless(bench::state& state) int handlers_per_iteration = 100; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto ex = ioc.get_executor(); int64_t counter = 0; diff --git a/perf/bench/corosio/local_socket_latency_bench.cpp b/perf/bench/corosio/local_socket_latency_bench.cpp index cc99561a..8b548ddd 100644 --- a/perf/bench/corosio/local_socket_latency_bench.cpp +++ b/perf/bench/corosio/local_socket_latency_bench.cpp @@ -155,7 +155,7 @@ bench_unix_pingpong_latency_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [client, server] = corosio::make_local_stream_pair(ioc); capy::run_async(ioc.get_executor())( @@ -185,7 +185,7 @@ bench_unix_concurrent_latency_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); std::vector clients; std::vector servers; diff --git a/perf/bench/corosio/local_socket_throughput_bench.cpp b/perf/bench/corosio/local_socket_throughput_bench.cpp index ab9d6098..f57555bd 100644 --- a/perf/bench/corosio/local_socket_throughput_bench.cpp +++ b/perf/bench/corosio/local_socket_throughput_bench.cpp @@ -183,7 +183,7 @@ bench_unix_throughput_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [writer, reader] = corosio::make_local_stream_pair(ioc); std::vector write_buf(chunk_size, 'x'); @@ -243,7 +243,7 @@ bench_unix_bidirectional_throughput_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [sock1, sock2] = corosio::make_local_stream_pair(ioc); std::vector buf1(chunk_size, 'a'); diff --git a/perf/bench/corosio/socket_latency_bench.cpp b/perf/bench/corosio/socket_latency_bench.cpp index 4497fbf3..872736d0 100644 --- a/perf/bench/corosio/socket_latency_bench.cpp +++ b/perf/bench/corosio/socket_latency_bench.cpp @@ -168,7 +168,7 @@ bench_pingpong_latency_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [client, server] = corosio::test::make_socket_pair< socket_type, corosio::native_tcp_acceptor>(ioc); @@ -204,7 +204,7 @@ bench_concurrent_latency_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); std::vector clients; std::vector servers; diff --git a/perf/bench/corosio/socket_throughput_bench.cpp b/perf/bench/corosio/socket_throughput_bench.cpp index a52c2735..c29f2247 100644 --- a/perf/bench/corosio/socket_throughput_bench.cpp +++ b/perf/bench/corosio/socket_throughput_bench.cpp @@ -255,7 +255,7 @@ bench_throughput_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [writer, reader] = corosio::test::make_socket_pair< socket_type, corosio::native_tcp_acceptor>(ioc); @@ -321,7 +321,7 @@ bench_bidirectional_throughput_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); auto [sock1, sock2] = corosio::test::make_socket_pair< socket_type, corosio::native_tcp_acceptor>(ioc); diff --git a/perf/bench/corosio/timer_bench.cpp b/perf/bench/corosio/timer_bench.cpp index cc103626..0d950cf3 100644 --- a/perf/bench/corosio/timer_bench.cpp +++ b/perf/bench/corosio/timer_bench.cpp @@ -112,7 +112,7 @@ bench_schedule_cancel_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); int64_t counter = 0; int constexpr batch_size = 1000; @@ -148,7 +148,7 @@ bench_fire_rate_lockless(bench::state& state) corosio::io_context_options opts; opts.single_threaded = true; - corosio::native_io_context ioc(opts); + corosio::native_io_context ioc(opts, 1); std::atomic running{true}; int64_t counter = 0; diff --git a/src/corosio/src/io_context.cpp b/src/corosio/src/io_context.cpp index 0824f16a..268c9c8a 100644 --- a/src/corosio/src/io_context.cpp +++ b/src/corosio/src/io_context.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -141,19 +142,48 @@ pre_create_services( } // Apply runtime tuning to the scheduler after construction. +// +// Concurrency-hint heuristic for budget defaults: when the io_context is +// constructed with concurrency_hint > 1 AND the user has not customized +// the budget settings (i.e. they remain at the struct defaults), we +// disable the inline-completion fast path. Multi-thread workloads +// benefit from "always-post" because cross-thread work-stealing wins +// over chained dispatch on the originating thread. Single-thread (or +// any custom budget) keeps the user/library setting unchanged. void apply_scheduler_options( detail::scheduler& sched, - io_context_options const& opts) + io_context_options const& opts, + unsigned concurrency_hint) { #if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_KQUEUE || BOOST_COROSIO_HAS_SELECT + // Detect "user kept the defaults" by comparing all three to the + // io_context_options-defined struct defaults. + io_context_options defaults; + bool budget_at_defaults = + opts.inline_budget_initial == defaults.inline_budget_initial && + opts.inline_budget_max == defaults.inline_budget_max && + opts.unassisted_budget == defaults.unassisted_budget; + + unsigned init = opts.inline_budget_initial; + unsigned max = opts.inline_budget_max; + unsigned ua = opts.unassisted_budget; + + if (budget_at_defaults && concurrency_hint > 1) + { + // Multi-thread default: disable budget (post-everything). + init = 0; + max = 0; + ua = 0; + } + auto& reactor = static_cast(sched); reactor.configure_reactor( opts.max_events_per_poll, - opts.inline_budget_initial, - opts.inline_budget_max, - opts.unassisted_budget); + init, + max, + ua); if (opts.single_threaded) reactor.configure_single_threaded(true); #endif @@ -183,25 +213,40 @@ construct_default(capy::execution_context& ctx, unsigned concurrency_hint) #endif } +// Tie concurrency_hint == 1 to single_threaded (asio precedent). +io_context_options +normalize_options(io_context_options opts, unsigned concurrency_hint) +{ + if (concurrency_hint == 1) + opts.single_threaded = true; + return opts; +} + } // anonymous namespace -io_context::io_context() : io_context(std::thread::hardware_concurrency()) {} +io_context::io_context() + : io_context(std::max(2u, std::thread::hardware_concurrency())) +{ +} io_context::io_context(unsigned concurrency_hint) : capy::execution_context(this) , sched_(&construct_default(*this, concurrency_hint)) { + if (concurrency_hint == 1) + configure_single_threaded_(); } io_context::io_context( - io_context_options const& opts, + io_context_options const& opts_in, unsigned concurrency_hint) : capy::execution_context(this) , sched_(nullptr) { + auto opts = normalize_options(opts_in, concurrency_hint); pre_create_services(*this, opts); sched_ = &construct_default(*this, concurrency_hint); - apply_scheduler_options(*sched_, opts); + apply_scheduler_options(*sched_, opts, concurrency_hint); } void @@ -211,9 +256,25 @@ io_context::apply_options_pre_(io_context_options const& opts) } void -io_context::apply_options_post_(io_context_options const& opts) +io_context::apply_options_post_( + io_context_options const& opts_in, + unsigned concurrency_hint) +{ + auto opts = normalize_options(opts_in, concurrency_hint); + apply_scheduler_options(*sched_, opts, concurrency_hint); +} + +void +io_context::configure_single_threaded_() { - apply_scheduler_options(*sched_, opts); +#if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_KQUEUE || BOOST_COROSIO_HAS_SELECT + static_cast(*sched_) + .configure_single_threaded(true); +#endif +#if BOOST_COROSIO_HAS_IOCP + static_cast(*sched_) + .configure_single_threaded(true); +#endif } io_context::~io_context()