TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/detail/scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <coroutine>
23 : #include <cstddef>
24 : #include <cstdint>
25 : #include <limits>
26 : #include <memory>
27 : #include <stdexcept>
28 :
29 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 : #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : // Forward declarations
35 : class reactor_scheduler;
36 : class timer_service;
37 :
38 : /** Per-thread state for a reactor scheduler.
39 :
40 : Each thread running a scheduler's event loop has one of these
41 : on a thread-local stack. It holds a private work queue and
42 : inline completion budget for speculative I/O fast paths.
43 : */
44 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 : {
46 : /// Scheduler this context belongs to.
47 : reactor_scheduler const* key;
48 :
49 : /// Next context frame on this thread's stack.
50 : reactor_scheduler_context* next;
51 :
52 : /// Private work queue for reduced contention.
53 : op_queue private_queue;
54 :
55 : /// Unflushed work count for the private queue.
56 : std::int64_t private_outstanding_work;
57 :
58 : /// Remaining inline completions allowed this cycle.
59 : int inline_budget;
60 :
61 : /// Maximum inline budget (adaptive, 2-16).
62 : int inline_budget_max;
63 :
64 : /// True if no other thread absorbed queued work last cycle.
65 : bool unassisted;
66 :
67 : /// Construct a context frame linked to @a n.
68 : reactor_scheduler_context(
69 : reactor_scheduler const* k,
70 : reactor_scheduler_context* n);
71 : };
72 :
73 : /// Thread-local context stack for reactor schedulers.
74 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75 :
76 : /// Find the context frame for a scheduler on this thread.
77 : inline reactor_scheduler_context*
78 HIT 776422 : reactor_find_context(reactor_scheduler const* self) noexcept
79 : {
80 776422 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 : {
82 773306 : if (c->key == self)
83 773306 : return c;
84 : }
85 3116 : return nullptr;
86 : }
87 :
88 : /// Flush private work count to global counter.
89 : inline void
90 MIS 0 : reactor_flush_private_work(
91 : reactor_scheduler_context* ctx,
92 : std::atomic<std::int64_t>& outstanding_work) noexcept
93 : {
94 0 : if (ctx && ctx->private_outstanding_work > 0)
95 : {
96 0 : outstanding_work.fetch_add(
97 : ctx->private_outstanding_work, std::memory_order_relaxed);
98 0 : ctx->private_outstanding_work = 0;
99 : }
100 0 : }
101 :
102 : /** Drain private queue to global queue, flushing work count first.
103 :
104 : @return True if any ops were drained.
105 : */
106 : inline bool
107 0 : reactor_drain_private_queue(
108 : reactor_scheduler_context* ctx,
109 : std::atomic<std::int64_t>& outstanding_work,
110 : op_queue& completed_ops) noexcept
111 : {
112 0 : if (!ctx || ctx->private_queue.empty())
113 0 : return false;
114 :
115 0 : reactor_flush_private_work(ctx, outstanding_work);
116 0 : completed_ops.splice(ctx->private_queue);
117 0 : return true;
118 : }
119 :
120 : /** Non-template base for reactor-backed scheduler implementations.
121 :
122 : Provides the complete threading model shared by epoll, kqueue,
123 : and select schedulers: signal state machine, inline completion
124 : budget, work counting, run/poll methods, and the do_one event
125 : loop.
126 :
127 : Derived classes provide platform-specific hooks by overriding:
128 : - `run_task(lock, ctx)` to run the reactor poll
129 : - `interrupt_reactor()` to wake a blocked reactor
130 :
131 : De-templated from the original CRTP design to eliminate
132 : duplicate instantiations when multiple backends are compiled
133 : into the same binary. Virtual dispatch for run_task (called
134 : once per reactor cycle, before a blocking syscall) has
135 : negligible overhead.
136 :
137 : @par Thread Safety
138 : All public member functions are thread-safe.
139 : */
140 : class reactor_scheduler
141 : : public scheduler
142 : , public capy::execution_context::service
143 : {
144 : public:
145 : using key_type = scheduler;
146 : using context_type = reactor_scheduler_context;
147 : using mutex_type = conditionally_enabled_mutex;
148 : using lock_type = mutex_type::scoped_lock;
149 : using event_type = conditionally_enabled_event;
150 :
151 : /// Post a coroutine for deferred execution.
152 : void post(std::coroutine_handle<> h) const override;
153 :
154 : /// Post a scheduler operation for deferred execution.
155 : void post(scheduler_op* h) const override;
156 :
157 : /// Return true if called from a thread running this scheduler.
158 : bool running_in_this_thread() const noexcept override;
159 :
160 : /// Request the scheduler to stop dispatching handlers.
161 : void stop() override;
162 :
163 : /// Return true if the scheduler has been stopped.
164 : bool stopped() const noexcept override;
165 :
166 : /// Reset the stopped state so `run()` can resume.
167 : void restart() override;
168 :
169 : /// Run the event loop until no work remains.
170 : std::size_t run() override;
171 :
172 : /// Run until one handler completes or no work remains.
173 : std::size_t run_one() override;
174 :
175 : /// Run until one handler completes or @a usec elapses.
176 : std::size_t wait_one(long usec) override;
177 :
178 : /// Run ready handlers without blocking.
179 : std::size_t poll() override;
180 :
181 : /// Run at most one ready handler without blocking.
182 : std::size_t poll_one() override;
183 :
184 : /// Increment the outstanding work count.
185 : void work_started() noexcept override;
186 :
187 : /// Decrement the outstanding work count, stopping on zero.
188 : void work_finished() noexcept override;
189 :
190 : /** Reset the thread's inline completion budget.
191 :
192 : Called at the start of each posted completion handler to
193 : grant a fresh budget for speculative inline completions.
194 : */
195 : void reset_inline_budget() const noexcept;
196 :
197 : /** Consume one unit of inline budget if available.
198 :
199 : @return True if budget was available and consumed.
200 : */
201 : bool try_consume_inline_budget() const noexcept;
202 :
203 : /** Offset a forthcoming work_finished from work_cleanup.
204 :
205 : Called by descriptor_state when all I/O returned EAGAIN and
206 : no handler will be executed. Must be called from a scheduler
207 : thread.
208 : */
209 : void compensating_work_started() const noexcept;
210 :
211 : /** Drain work from thread context's private queue to global queue.
212 :
213 : Flushes private work count to the global counter, then
214 : transfers the queue under mutex protection.
215 :
216 : @param queue The private queue to drain.
217 : @param count Private work count to flush before draining.
218 : */
219 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220 :
221 : /** Post completed operations for deferred invocation.
222 :
223 : If called from a thread running this scheduler, operations
224 : go to the thread's private queue (fast path). Otherwise,
225 : operations are added to the global queue under mutex and a
226 : waiter is signaled.
227 :
228 : @par Preconditions
229 : work_started() must have been called for each operation.
230 :
231 : @param ops Queue of operations to post.
232 : */
233 : void post_deferred_completions(op_queue& ops) const;
234 :
235 : /** Apply runtime configuration to the scheduler.
236 :
237 : Called by `io_context` after construction. Values that do
238 : not apply to this backend are silently ignored.
239 :
240 : @param max_events Event buffer size for epoll/kqueue.
241 : @param budget_init Starting inline completion budget.
242 : @param budget_max Hard ceiling on adaptive budget ramp-up.
243 : @param unassisted Budget when single-threaded.
244 : */
245 : virtual void configure_reactor(
246 : unsigned max_events,
247 : unsigned budget_init,
248 : unsigned budget_max,
249 : unsigned unassisted);
250 :
251 : /// Return the configured initial inline budget.
252 HIT 502 : unsigned inline_budget_initial() const noexcept
253 : {
254 502 : return inline_budget_initial_;
255 : }
256 :
257 : /// Return true if single-threaded (lockless) mode is active.
258 64 : bool is_single_threaded() const noexcept
259 : {
260 64 : return single_threaded_;
261 : }
262 :
263 : /** Enable or disable single-threaded (lockless) mode.
264 :
265 : When enabled, all scheduler mutex and condition variable
266 : operations become no-ops. Cross-thread post() is
267 : undefined behavior.
268 : */
269 1 : void configure_single_threaded(bool v) noexcept
270 : {
271 1 : single_threaded_ = v;
272 1 : mutex_.set_enabled(!v);
273 1 : cond_.set_enabled(!v);
274 1 : }
275 :
276 : protected:
277 : timer_service* timer_svc_ = nullptr;
278 : bool single_threaded_ = false;
279 :
280 605 : reactor_scheduler() = default;
281 :
282 : /** Drain completed_ops during shutdown.
283 :
284 : Pops all operations from the global queue and destroys them,
285 : skipping the task sentinel. Signals all waiting threads.
286 : Derived classes call this from their shutdown() override
287 : before performing platform-specific cleanup.
288 : */
289 : void shutdown_drain();
290 :
291 : /// RAII guard that re-inserts the task sentinel after `run_task`.
292 : struct task_cleanup
293 : {
294 : reactor_scheduler const* sched;
295 : lock_type* lock;
296 : context_type* ctx;
297 : ~task_cleanup();
298 : };
299 :
300 : mutable mutex_type mutex_{true};
301 : mutable event_type cond_{true};
302 : mutable op_queue completed_ops_;
303 : mutable std::atomic<std::int64_t> outstanding_work_{0};
304 : std::atomic<bool> stopped_{false};
305 : mutable std::atomic<bool> task_running_{false};
306 : mutable bool task_interrupted_ = false;
307 :
308 : // Runtime-configurable reactor tuning parameters.
309 : // Defaults match the library's built-in values.
310 : unsigned max_events_per_poll_ = 128;
311 : unsigned inline_budget_initial_ = 2;
312 : unsigned inline_budget_max_ = 16;
313 : unsigned unassisted_budget_ = 4;
314 :
315 : /// Bit 0 of `state_`: set when the condvar should be signaled.
316 : static constexpr std::size_t signaled_bit = 1;
317 :
318 : /// Increment per waiting thread in `state_`.
319 : static constexpr std::size_t waiter_increment = 2;
320 : mutable std::size_t state_ = 0;
321 :
322 : /// Sentinel op that triggers a reactor poll when dequeued.
323 : struct task_op final : scheduler_op
324 : {
325 MIS 0 : void operator()() override {}
326 0 : void destroy() override {}
327 : };
328 : task_op task_op_;
329 :
330 : /// Run the platform-specific reactor poll.
331 : virtual void
332 : run_task(lock_type& lock, context_type* ctx,
333 : long timeout_us) = 0;
334 :
335 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 : virtual void interrupt_reactor() const = 0;
337 :
338 : private:
339 : struct work_cleanup
340 : {
341 : reactor_scheduler* sched;
342 : lock_type* lock;
343 : context_type* ctx;
344 : ~work_cleanup();
345 : };
346 :
347 : std::size_t do_one(
348 : lock_type& lock, long timeout_us, context_type* ctx);
349 :
350 : void signal_all(lock_type& lock) const;
351 : bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 : bool unlock_and_signal_one(lock_type& lock) const;
353 : void clear_signal() const;
354 : void wait_for_signal(lock_type& lock) const;
355 : void wait_for_signal_for(
356 : lock_type& lock, long timeout_us) const;
357 : void wake_one_thread_and_unlock(lock_type& lock) const;
358 : };
359 :
360 : /** RAII guard that pushes/pops a scheduler context frame.
361 :
362 : On construction, pushes a new context frame onto the
363 : thread-local stack. On destruction, drains any remaining
364 : private queue items to the global queue and pops the frame.
365 : */
366 : struct reactor_thread_context_guard
367 : {
368 : /// The context frame managed by this guard.
369 : reactor_scheduler_context frame_;
370 :
371 : /// Construct the guard, pushing a frame for @a sched.
372 HIT 502 : explicit reactor_thread_context_guard(
373 : reactor_scheduler const* sched) noexcept
374 502 : : frame_(sched, reactor_context_stack.get())
375 : {
376 502 : reactor_context_stack.set(&frame_);
377 502 : }
378 :
379 : /// Destroy the guard, draining private work and popping the frame.
380 502 : ~reactor_thread_context_guard() noexcept
381 : {
382 502 : if (!frame_.private_queue.empty())
383 MIS 0 : frame_.key->drain_thread_queue(
384 0 : frame_.private_queue, frame_.private_outstanding_work);
385 HIT 502 : reactor_context_stack.set(frame_.next);
386 502 : }
387 : };
388 :
389 : // ---- Inline implementations ------------------------------------------------
390 :
391 : inline
392 502 : reactor_scheduler_context::reactor_scheduler_context(
393 : reactor_scheduler const* k,
394 502 : reactor_scheduler_context* n)
395 502 : : key(k)
396 502 : , next(n)
397 502 : , private_outstanding_work(0)
398 502 : , inline_budget(0)
399 502 : , inline_budget_max(
400 502 : static_cast<int>(k->inline_budget_initial()))
401 502 : , unassisted(false)
402 : {
403 502 : }
404 :
405 : inline void
406 MIS 0 : reactor_scheduler::configure_reactor(
407 : unsigned max_events,
408 : unsigned budget_init,
409 : unsigned budget_max,
410 : unsigned unassisted)
411 : {
412 0 : if (max_events < 1 ||
413 0 : max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 : throw std::out_of_range(
415 0 : "max_events_per_poll must be in [1, INT_MAX]");
416 0 : if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 : throw std::out_of_range(
418 0 : "inline_budget_max must be in [0, INT_MAX]");
419 :
420 : // Clamp initial and unassisted to budget_max.
421 0 : if (budget_init > budget_max)
422 0 : budget_init = budget_max;
423 0 : if (unassisted > budget_max)
424 0 : unassisted = budget_max;
425 :
426 0 : max_events_per_poll_ = max_events;
427 0 : inline_budget_initial_ = budget_init;
428 0 : inline_budget_max_ = budget_max;
429 0 : unassisted_budget_ = unassisted;
430 0 : }
431 :
432 : inline void
433 HIT 101098 : reactor_scheduler::reset_inline_budget() const noexcept
434 : {
435 : // When budget is disabled (max==0), all paths below would no-op
436 : // (inline_budget stays 0). Skip the TLS lookup entirely.
437 101098 : if (inline_budget_max_ == 0)
438 MIS 0 : return;
439 HIT 101098 : if (auto* ctx = reactor_find_context(this))
440 : {
441 : // Cap when no other thread absorbed queued work
442 101098 : if (ctx->unassisted)
443 : {
444 101098 : ctx->inline_budget_max =
445 101098 : static_cast<int>(unassisted_budget_);
446 101098 : ctx->inline_budget =
447 101098 : static_cast<int>(unassisted_budget_);
448 101098 : return;
449 : }
450 : // Ramp up when previous cycle fully consumed budget.
451 : // max(1, ...) ensures the doubling escapes zero.
452 MIS 0 : if (ctx->inline_budget == 0)
453 0 : ctx->inline_budget_max = (std::min)(
454 0 : (std::max)(1, ctx->inline_budget_max) * 2,
455 0 : static_cast<int>(inline_budget_max_));
456 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
457 0 : ctx->inline_budget_max =
458 0 : static_cast<int>(inline_budget_initial_);
459 0 : ctx->inline_budget = ctx->inline_budget_max;
460 : }
461 : }
462 :
463 : inline bool
464 HIT 424684 : reactor_scheduler::try_consume_inline_budget() const noexcept
465 : {
466 424684 : if (inline_budget_max_ == 0)
467 MIS 0 : return false;
468 HIT 424684 : if (auto* ctx = reactor_find_context(this))
469 : {
470 424684 : if (ctx->inline_budget > 0)
471 : {
472 339755 : --ctx->inline_budget;
473 339755 : return true;
474 : }
475 : }
476 84929 : return false;
477 : }
478 :
479 : inline void
480 2189 : reactor_scheduler::post(std::coroutine_handle<> h) const
481 : {
482 : struct post_handler final : scheduler_op
483 : {
484 : std::coroutine_handle<> h_;
485 :
486 2189 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
487 4378 : ~post_handler() override = default;
488 :
489 2180 : void operator()() override
490 : {
491 2180 : auto saved = h_;
492 2180 : delete this;
493 : // Ensure stores from the posting thread are visible
494 : std::atomic_thread_fence(std::memory_order_acquire);
495 2180 : saved.resume();
496 2180 : }
497 :
498 9 : void destroy() override
499 : {
500 9 : auto saved = h_;
501 9 : delete this;
502 9 : saved.destroy();
503 9 : }
504 : };
505 :
506 2189 : auto ph = std::make_unique<post_handler>(h);
507 :
508 2189 : if (auto* ctx = reactor_find_context(this))
509 : {
510 6 : ++ctx->private_outstanding_work;
511 6 : ctx->private_queue.push(ph.release());
512 6 : return;
513 : }
514 :
515 2183 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
516 :
517 2183 : lock_type lock(mutex_);
518 2183 : completed_ops_.push(ph.release());
519 2183 : wake_one_thread_and_unlock(lock);
520 2189 : }
521 :
522 : inline void
523 102369 : reactor_scheduler::post(scheduler_op* h) const
524 : {
525 102369 : if (auto* ctx = reactor_find_context(this))
526 : {
527 102197 : ++ctx->private_outstanding_work;
528 102197 : ctx->private_queue.push(h);
529 102197 : return;
530 : }
531 :
532 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
533 :
534 172 : lock_type lock(mutex_);
535 172 : completed_ops_.push(h);
536 172 : wake_one_thread_and_unlock(lock);
537 172 : }
538 :
539 : inline bool
540 1359 : reactor_scheduler::running_in_this_thread() const noexcept
541 : {
542 1359 : return reactor_find_context(this) != nullptr;
543 : }
544 :
545 : inline void
546 461 : reactor_scheduler::stop()
547 : {
548 461 : lock_type lock(mutex_);
549 461 : if (!stopped_.load(std::memory_order_acquire))
550 : {
551 420 : stopped_.store(true, std::memory_order_release);
552 420 : signal_all(lock);
553 420 : interrupt_reactor();
554 : }
555 461 : }
556 :
557 : inline bool
558 62 : reactor_scheduler::stopped() const noexcept
559 : {
560 62 : return stopped_.load(std::memory_order_acquire);
561 : }
562 :
563 : inline void
564 111 : reactor_scheduler::restart()
565 : {
566 111 : stopped_.store(false, std::memory_order_release);
567 111 : }
568 :
569 : inline std::size_t
570 432 : reactor_scheduler::run()
571 : {
572 864 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 : {
574 31 : stop();
575 31 : return 0;
576 : }
577 :
578 401 : reactor_thread_context_guard ctx(this);
579 401 : lock_type lock(mutex_);
580 :
581 401 : std::size_t n = 0;
582 : for (;;)
583 : {
584 265456 : if (!do_one(lock, -1, &ctx.frame_))
585 401 : break;
586 265055 : if (n != (std::numeric_limits<std::size_t>::max)())
587 265055 : ++n;
588 265055 : if (!lock.owns_lock())
589 171183 : lock.lock();
590 : }
591 401 : return n;
592 401 : }
593 :
594 : inline std::size_t
595 2 : reactor_scheduler::run_one()
596 : {
597 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
598 : {
599 MIS 0 : stop();
600 0 : return 0;
601 : }
602 :
603 HIT 2 : reactor_thread_context_guard ctx(this);
604 2 : lock_type lock(mutex_);
605 2 : return do_one(lock, -1, &ctx.frame_);
606 2 : }
607 :
608 : inline std::size_t
609 102 : reactor_scheduler::wait_one(long usec)
610 : {
611 204 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
612 : {
613 10 : stop();
614 10 : return 0;
615 : }
616 :
617 92 : reactor_thread_context_guard ctx(this);
618 92 : lock_type lock(mutex_);
619 92 : return do_one(lock, usec, &ctx.frame_);
620 92 : }
621 :
622 : inline std::size_t
623 6 : reactor_scheduler::poll()
624 : {
625 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 : {
627 1 : stop();
628 1 : return 0;
629 : }
630 :
631 5 : reactor_thread_context_guard ctx(this);
632 5 : lock_type lock(mutex_);
633 :
634 5 : std::size_t n = 0;
635 : for (;;)
636 : {
637 11 : if (!do_one(lock, 0, &ctx.frame_))
638 5 : break;
639 6 : if (n != (std::numeric_limits<std::size_t>::max)())
640 6 : ++n;
641 6 : if (!lock.owns_lock())
642 6 : lock.lock();
643 : }
644 5 : return n;
645 5 : }
646 :
647 : inline std::size_t
648 4 : reactor_scheduler::poll_one()
649 : {
650 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
651 : {
652 2 : stop();
653 2 : return 0;
654 : }
655 :
656 2 : reactor_thread_context_guard ctx(this);
657 2 : lock_type lock(mutex_);
658 2 : return do_one(lock, 0, &ctx.frame_);
659 2 : }
660 :
661 : inline void
662 26051 : reactor_scheduler::work_started() noexcept
663 : {
664 26051 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
665 26051 : }
666 :
667 : inline void
668 36698 : reactor_scheduler::work_finished() noexcept
669 : {
670 73396 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
671 412 : stop();
672 36698 : }
673 :
674 : inline void
675 144723 : reactor_scheduler::compensating_work_started() const noexcept
676 : {
677 144723 : auto* ctx = reactor_find_context(this);
678 144723 : if (ctx)
679 144723 : ++ctx->private_outstanding_work;
680 144723 : }
681 :
682 : inline void
683 MIS 0 : reactor_scheduler::drain_thread_queue(
684 : op_queue& queue, std::int64_t count) const
685 : {
686 0 : if (count > 0)
687 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
688 :
689 0 : lock_type lock(mutex_);
690 0 : completed_ops_.splice(queue);
691 0 : if (count > 0)
692 0 : maybe_unlock_and_signal_one(lock);
693 0 : }
694 :
695 : inline void
696 HIT 15852 : reactor_scheduler::post_deferred_completions(op_queue& ops) const
697 : {
698 15852 : if (ops.empty())
699 15852 : return;
700 :
701 MIS 0 : if (auto* ctx = reactor_find_context(this))
702 : {
703 0 : ctx->private_queue.splice(ops);
704 0 : return;
705 : }
706 :
707 0 : lock_type lock(mutex_);
708 0 : completed_ops_.splice(ops);
709 0 : wake_one_thread_and_unlock(lock);
710 0 : }
711 :
712 : inline void
713 HIT 605 : reactor_scheduler::shutdown_drain()
714 : {
715 605 : lock_type lock(mutex_);
716 :
717 1315 : while (auto* h = completed_ops_.pop())
718 : {
719 710 : if (h == &task_op_)
720 605 : continue;
721 105 : lock.unlock();
722 105 : h->destroy();
723 105 : lock.lock();
724 710 : }
725 :
726 605 : signal_all(lock);
727 605 : }
728 :
729 : inline void
730 1025 : reactor_scheduler::signal_all(lock_type&) const
731 : {
732 1025 : state_ |= signaled_bit;
733 1025 : cond_.notify_all();
734 1025 : }
735 :
736 : inline bool
737 2355 : reactor_scheduler::maybe_unlock_and_signal_one(
738 : lock_type& lock) const
739 : {
740 2355 : state_ |= signaled_bit;
741 2355 : if (state_ > signaled_bit)
742 : {
743 MIS 0 : lock.unlock();
744 0 : cond_.notify_one();
745 0 : return true;
746 : }
747 HIT 2355 : return false;
748 : }
749 :
750 : inline bool
751 314908 : reactor_scheduler::unlock_and_signal_one(
752 : lock_type& lock) const
753 : {
754 314908 : state_ |= signaled_bit;
755 314908 : bool have_waiters = state_ > signaled_bit;
756 314908 : lock.unlock();
757 314908 : if (have_waiters)
758 MIS 0 : cond_.notify_one();
759 HIT 314908 : return have_waiters;
760 : }
761 :
762 : inline void
763 MIS 0 : reactor_scheduler::clear_signal() const
764 : {
765 0 : state_ &= ~signaled_bit;
766 0 : }
767 :
768 : inline void
769 0 : reactor_scheduler::wait_for_signal(
770 : lock_type& lock) const
771 : {
772 0 : while ((state_ & signaled_bit) == 0)
773 : {
774 0 : state_ += waiter_increment;
775 0 : cond_.wait(lock);
776 0 : state_ -= waiter_increment;
777 : }
778 0 : }
779 :
780 : inline void
781 0 : reactor_scheduler::wait_for_signal_for(
782 : lock_type& lock, long timeout_us) const
783 : {
784 0 : if ((state_ & signaled_bit) == 0)
785 : {
786 0 : state_ += waiter_increment;
787 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
788 0 : state_ -= waiter_increment;
789 : }
790 0 : }
791 :
792 : inline void
793 HIT 2355 : reactor_scheduler::wake_one_thread_and_unlock(
794 : lock_type& lock) const
795 : {
796 2355 : if (maybe_unlock_and_signal_one(lock))
797 MIS 0 : return;
798 :
799 HIT 2355 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
800 : {
801 52 : task_interrupted_ = true;
802 52 : lock.unlock();
803 52 : interrupt_reactor();
804 : }
805 : else
806 : {
807 2303 : lock.unlock();
808 : }
809 : }
810 :
811 265116 : inline reactor_scheduler::work_cleanup::~work_cleanup()
812 : {
813 265116 : if (ctx)
814 : {
815 265116 : std::int64_t produced = ctx->private_outstanding_work;
816 265116 : if (produced > 1)
817 15 : sched->outstanding_work_.fetch_add(
818 : produced - 1, std::memory_order_relaxed);
819 265101 : else if (produced < 1)
820 26499 : sched->work_finished();
821 265116 : ctx->private_outstanding_work = 0;
822 :
823 265116 : if (!ctx->private_queue.empty())
824 : {
825 93894 : lock->lock();
826 93894 : sched->completed_ops_.splice(ctx->private_queue);
827 : }
828 : }
829 : else
830 : {
831 MIS 0 : sched->work_finished();
832 : }
833 HIT 265116 : }
834 :
835 360808 : inline reactor_scheduler::task_cleanup::~task_cleanup()
836 : {
837 180404 : if (!ctx)
838 MIS 0 : return;
839 :
840 HIT 180404 : if (ctx->private_outstanding_work > 0)
841 : {
842 8282 : sched->outstanding_work_.fetch_add(
843 8282 : ctx->private_outstanding_work, std::memory_order_relaxed);
844 8282 : ctx->private_outstanding_work = 0;
845 : }
846 :
847 180404 : if (!ctx->private_queue.empty())
848 : {
849 8282 : if (!lock->owns_lock())
850 MIS 0 : lock->lock();
851 HIT 8282 : sched->completed_ops_.splice(ctx->private_queue);
852 : }
853 180404 : }
854 :
855 : inline std::size_t
856 265563 : reactor_scheduler::do_one(
857 : lock_type& lock, long timeout_us, context_type* ctx)
858 : {
859 : for (;;)
860 : {
861 445926 : if (stopped_.load(std::memory_order_acquire))
862 402 : return 0;
863 :
864 445524 : scheduler_op* op = completed_ops_.pop();
865 :
866 : // Handle reactor sentinel — time to poll for I/O
867 445524 : if (op == &task_op_)
868 : {
869 : bool more_handlers =
870 180408 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
871 :
872 311024 : if (!more_handlers &&
873 261232 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
874 : timeout_us == 0))
875 : {
876 4 : completed_ops_.push(&task_op_);
877 4 : return 0;
878 : }
879 :
880 180404 : long task_timeout_us = more_handlers ? 0 : timeout_us;
881 180404 : task_interrupted_ = task_timeout_us == 0;
882 180404 : task_running_.store(true, std::memory_order_release);
883 :
884 180404 : if (more_handlers)
885 49792 : unlock_and_signal_one(lock);
886 :
887 : try
888 : {
889 180404 : run_task(lock, ctx, task_timeout_us);
890 : }
891 MIS 0 : catch (...)
892 : {
893 0 : task_running_.store(false, std::memory_order_relaxed);
894 0 : throw;
895 0 : }
896 :
897 HIT 180404 : task_running_.store(false, std::memory_order_relaxed);
898 180404 : completed_ops_.push(&task_op_);
899 180404 : if (timeout_us > 0)
900 41 : return 0;
901 180363 : continue;
902 180363 : }
903 :
904 : // Handle operation
905 265116 : if (op != nullptr)
906 : {
907 265116 : bool more = !completed_ops_.empty();
908 :
909 265116 : if (more)
910 265116 : ctx->unassisted = !unlock_and_signal_one(lock);
911 : else
912 : {
913 MIS 0 : ctx->unassisted = false;
914 0 : lock.unlock();
915 : }
916 :
917 HIT 265116 : work_cleanup on_exit{this, &lock, ctx};
918 : (void)on_exit;
919 :
920 265116 : (*op)();
921 265116 : return 1;
922 265116 : }
923 :
924 : // Try private queue before blocking
925 MIS 0 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
926 0 : continue;
927 :
928 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
929 : timeout_us == 0)
930 0 : return 0;
931 :
932 0 : clear_signal();
933 0 : if (timeout_us < 0)
934 0 : wait_for_signal(lock);
935 : else
936 0 : wait_for_signal_for(lock, timeout_us);
937 HIT 180363 : }
938 : }
939 :
940 : } // namespace boost::corosio::detail
941 :
942 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|