libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

77.6% Lines (387/499) 85.4% Functions (41/48) 63.0% Branches (201/319)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106
107 160 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 160 : key(k)
109 160 , next(n)
110 160 , private_outstanding_work(0)
111 {
112 160 }
113 };
114
115 namespace {
116
117 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
118
119 struct thread_context_guard
120 {
121 scheduler_context frame_;
122
123 160 explicit thread_context_guard(
124 epoll_scheduler const* ctx) noexcept
125 160 : frame_(ctx, context_stack.get())
126 {
127 160 context_stack.set(&frame_);
128 160 }
129
130 160 ~thread_context_guard() noexcept
131 {
132
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 160 times.
160 if (!frame_.private_queue.empty())
133 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
134 160 context_stack.set(frame_.next);
135 160 }
136 };
137
138 scheduler_context*
139 237016 find_context(epoll_scheduler const* self) noexcept
140 {
141
2/2
✓ Branch 1 taken 235367 times.
✓ Branch 2 taken 1649 times.
237016 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
142
1/2
✓ Branch 0 taken 235367 times.
✗ Branch 1 not taken.
235367 if (c->key == self)
143 235367 return c;
144 1649 return nullptr;
145 }
146
147 /// Flush private work count to global counter.
148 void
149 flush_private_work(
150 scheduler_context* ctx,
151 std::atomic<long>& outstanding_work) noexcept
152 {
153 if (ctx && ctx->private_outstanding_work > 0)
154 {
155 outstanding_work.fetch_add(
156 ctx->private_outstanding_work, std::memory_order_relaxed);
157 ctx->private_outstanding_work = 0;
158 }
159 }
160
161 /// Drain private queue to global queue, flushing work count first.
162 ///
163 /// @return True if any ops were drained.
164 bool
165 drain_private_queue(
166 scheduler_context* ctx,
167 std::atomic<long>& outstanding_work,
168 op_queue& completed_ops) noexcept
169 {
170 if (!ctx || ctx->private_queue.empty())
171 return false;
172
173 flush_private_work(ctx, outstanding_work);
174 completed_ops.splice(ctx->private_queue);
175 return true;
176 }
177
178 } // namespace
179
180 void
181 82042 descriptor_state::
182 operator()()
183 {
184 82042 is_enqueued_.store(false, std::memory_order_relaxed);
185
186 // Take ownership of impl ref set by close_socket() to prevent
187 // the owning impl from being freed while we're executing
188 82042 auto prevent_impl_destruction = std::move(impl_ref_);
189
190 82042 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82042 times.
82042 if (ev == 0)
192 {
193 scheduler_->compensating_work_started();
194 return;
195 }
196
197 82042 op_queue local_ops;
198
199 82042 int err = 0;
200
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 82041 times.
82042 if (ev & EPOLLERR)
201 {
202 1 socklen_t len = sizeof(err);
203
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
204 err = errno;
205
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
206 1 err = EIO;
207 }
208
209 82042 epoll_op* rd = nullptr;
210 82042 epoll_op* wr = nullptr;
211 82042 epoll_op* cn = nullptr;
212 {
213
1/1
✓ Branch 1 taken 82042 times.
82042 std::lock_guard lock(mutex);
214
2/2
✓ Branch 0 taken 37960 times.
✓ Branch 1 taken 44082 times.
82042 if (ev & EPOLLIN)
215 {
216 37960 rd = std::exchange(read_op, nullptr);
217
2/2
✓ Branch 0 taken 35129 times.
✓ Branch 1 taken 2831 times.
37960 if (!rd)
218 35129 read_ready = true;
219 }
220
2/2
✓ Branch 0 taken 79263 times.
✓ Branch 1 taken 2779 times.
82042 if (ev & EPOLLOUT)
221 {
222 79263 cn = std::exchange(connect_op, nullptr);
223 79263 wr = std::exchange(write_op, nullptr);
224
3/4
✓ Branch 0 taken 76481 times.
✓ Branch 1 taken 2782 times.
✓ Branch 2 taken 76481 times.
✗ Branch 3 not taken.
79263 if (!cn && !wr)
225 76481 write_ready = true;
226 }
227
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 82041 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
82042 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
228 {
229 rd = std::exchange(read_op, nullptr);
230 wr = std::exchange(write_op, nullptr);
231 cn = std::exchange(connect_op, nullptr);
232 }
233 82042 }
234
235 // Non-null after I/O means EAGAIN; re-register under lock below
236
2/2
✓ Branch 0 taken 2831 times.
✓ Branch 1 taken 79211 times.
82042 if (rd)
237 {
238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2831 times.
2831 if (err)
239 rd->complete(err, 0);
240 else
241 2831 rd->perform_io();
242
243
2/4
✓ Branch 0 taken 2831 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2831 times.
2831 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
244 {
245 rd->errn = 0;
246 }
247 else
248 {
249 2831 local_ops.push(rd);
250 2831 rd = nullptr;
251 }
252 }
253
254
2/2
✓ Branch 0 taken 2782 times.
✓ Branch 1 taken 79260 times.
82042 if (cn)
255 {
256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2782 times.
2782 if (err)
257 cn->complete(err, 0);
258 else
259 2782 cn->perform_io();
260 2782 local_ops.push(cn);
261 2782 cn = nullptr;
262 }
263
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82042 times.
82042 if (wr)
265 {
266 if (err)
267 wr->complete(err, 0);
268 else
269 wr->perform_io();
270
271 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
272 {
273 wr->errn = 0;
274 }
275 else
276 {
277 local_ops.push(wr);
278 wr = nullptr;
279 }
280 }
281
282
2/4
✓ Branch 0 taken 82042 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 82042 times.
82042 if (rd || wr)
283 {
284 std::lock_guard lock(mutex);
285 if (rd)
286 read_op = rd;
287 if (wr)
288 write_op = wr;
289 }
290
291 // Execute first handler inline — the scheduler's work_cleanup
292 // accounts for this as the "consumed" work item
293 82042 scheduler_op* first = local_ops.pop();
294
2/2
✓ Branch 0 taken 5613 times.
✓ Branch 1 taken 76429 times.
82042 if (first)
295 {
296
1/1
✓ Branch 1 taken 5613 times.
5613 scheduler_->post_deferred_completions(local_ops);
297
1/1
✓ Branch 1 taken 5613 times.
5613 (*first)();
298 }
299 else
300 {
301 76429 scheduler_->compensating_work_started();
302 }
303 82042 }
304
305 189 epoll_scheduler::
306 epoll_scheduler(
307 capy::execution_context& ctx,
308 189 int)
309 189 : epoll_fd_(-1)
310 189 , event_fd_(-1)
311 189 , timer_fd_(-1)
312 189 , outstanding_work_(0)
313 189 , stopped_(false)
314 189 , shutdown_(false)
315 189 , task_running_(false)
316 189 , task_interrupted_(false)
317 378 , state_(0)
318 {
319 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
321 detail::throw_system_error(make_err(errno), "epoll_create1");
322
323 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
325 {
326 int errn = errno;
327 ::close(epoll_fd_);
328 detail::throw_system_error(make_err(errn), "eventfd");
329 }
330
331 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
333 {
334 int errn = errno;
335 ::close(event_fd_);
336 ::close(epoll_fd_);
337 detail::throw_system_error(make_err(errn), "timerfd_create");
338 }
339
340 189 epoll_event ev{};
341 189 ev.events = EPOLLIN | EPOLLET;
342 189 ev.data.ptr = nullptr;
343
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
344 {
345 int errn = errno;
346 ::close(timer_fd_);
347 ::close(event_fd_);
348 ::close(epoll_fd_);
349 detail::throw_system_error(make_err(errn), "epoll_ctl");
350 }
351
352 189 epoll_event timer_ev{};
353 189 timer_ev.events = EPOLLIN | EPOLLERR;
354 189 timer_ev.data.ptr = &timer_fd_;
355
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
356 {
357 int errn = errno;
358 ::close(timer_fd_);
359 ::close(event_fd_);
360 ::close(epoll_fd_);
361 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
362 }
363
364
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
365
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
366 timer_service::callback(
367 this,
368 3022 [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
369
370 // Initialize resolver service
371
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
372
373 // Initialize signal service
374
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
375
376 // Push task sentinel to interleave reactor runs with handler execution
377 189 completed_ops_.push(&task_op_);
378 189 }
379
380 378 epoll_scheduler::
381 189 ~epoll_scheduler()
382 {
383
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
384 189 ::close(timer_fd_);
385
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
386 189 ::close(event_fd_);
387
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
388 189 ::close(epoll_fd_);
389 378 }
390
391 void
392 189 epoll_scheduler::
393 shutdown()
394 {
395 {
396
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
397 189 shutdown_ = true;
398
399
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
400 {
401
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
402 189 continue;
403 lock.unlock();
404 h->destroy();
405 lock.lock();
406 189 }
407
408 189 signal_all(lock);
409 189 }
410
411 189 outstanding_work_.store(0, std::memory_order_release);
412
413
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
414 189 interrupt_reactor();
415 189 }
416
417 void
418 4654 epoll_scheduler::
419 post(capy::coro h) const
420 {
421 struct post_handler final
422 : scheduler_op
423 {
424 capy::coro h_;
425
426 explicit
427 4654 post_handler(capy::coro h)
428 4654 : h_(h)
429 {
430 4654 }
431
432 9308 ~post_handler() = default;
433
434 4654 void operator()() override
435 {
436 4654 auto h = h_;
437
1/2
✓ Branch 0 taken 4654 times.
✗ Branch 1 not taken.
4654 delete this;
438 std::atomic_thread_fence(std::memory_order_acquire);
439
1/1
✓ Branch 1 taken 4654 times.
4654 h.resume();
440 4654 }
441
442 void destroy() override
443 {
444 delete this;
445 }
446 };
447
448
1/1
✓ Branch 1 taken 4654 times.
4654 auto ph = std::make_unique<post_handler>(h);
449
450 // Fast path: same thread posts to private queue
451 // Only count locally; work_cleanup batches to global counter
452
2/2
✓ Branch 1 taken 3031 times.
✓ Branch 2 taken 1623 times.
4654 if (auto* ctx = find_context(this))
453 {
454 3031 ++ctx->private_outstanding_work;
455 3031 ctx->private_queue.push(ph.release());
456 3031 return;
457 }
458
459 // Slow path: cross-thread post requires mutex
460 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
461
462
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
463 1623 completed_ops_.push(ph.release());
464
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
465 4654 }
466
467 void
468 155933 epoll_scheduler::
469 post(scheduler_op* h) const
470 {
471 // Fast path: same thread posts to private queue
472 // Only count locally; work_cleanup batches to global counter
473
2/2
✓ Branch 1 taken 155907 times.
✓ Branch 2 taken 26 times.
155933 if (auto* ctx = find_context(this))
474 {
475 155907 ++ctx->private_outstanding_work;
476 155907 ctx->private_queue.push(h);
477 155907 return;
478 }
479
480 // Slow path: cross-thread post requires mutex
481 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
482
483
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
484 26 completed_ops_.push(h);
485
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
486 26 }
487
488 void
489 3506 epoll_scheduler::
490 on_work_started() noexcept
491 {
492 3506 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
493 3506 }
494
495 void
496 3474 epoll_scheduler::
497 on_work_finished() noexcept
498 {
499
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3474 times.
6948 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
500 stop();
501 3474 }
502
503 bool
504 3261 epoll_scheduler::
505 running_in_this_thread() const noexcept
506 {
507
2/2
✓ Branch 1 taken 3051 times.
✓ Branch 2 taken 210 times.
3261 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
508
1/2
✓ Branch 0 taken 3051 times.
✗ Branch 1 not taken.
3051 if (c->key == this)
509 3051 return true;
510 210 return false;
511 }
512
513 void
514 38 epoll_scheduler::
515 stop()
516 {
517
1/1
✓ Branch 1 taken 38 times.
38 std::unique_lock lock(mutex_);
518
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 18 times.
38 if (!stopped_)
519 {
520 20 stopped_ = true;
521 20 signal_all(lock);
522
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
523 }
524 38 }
525
526 bool
527 16 epoll_scheduler::
528 stopped() const noexcept
529 {
530 16 std::unique_lock lock(mutex_);
531 32 return stopped_;
532 16 }
533
534 void
535 49 epoll_scheduler::
536 restart()
537 {
538
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
539 49 stopped_ = false;
540 49 }
541
542 std::size_t
543 175 epoll_scheduler::
544 run()
545 {
546
2/2
✓ Branch 1 taken 29 times.
✓ Branch 2 taken 146 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
547 {
548
1/1
✓ Branch 1 taken 29 times.
29 stop();
549 29 return 0;
550 }
551
552 146 thread_context_guard ctx(this);
553
1/1
✓ Branch 1 taken 146 times.
146 std::unique_lock lock(mutex_);
554
555 146 std::size_t n = 0;
556 for (;;)
557 {
558
3/3
✓ Branch 1 taken 242760 times.
✓ Branch 3 taken 146 times.
✓ Branch 4 taken 242614 times.
242760 if (!do_one(lock, -1, &ctx.frame_))
559 146 break;
560
1/2
✓ Branch 1 taken 242614 times.
✗ Branch 2 not taken.
242614 if (n != (std::numeric_limits<std::size_t>::max)())
561 242614 ++n;
562
2/2
✓ Branch 1 taken 86735 times.
✓ Branch 2 taken 155879 times.
242614 if (!lock.owns_lock())
563
1/1
✓ Branch 1 taken 86735 times.
86735 lock.lock();
564 }
565 146 return n;
566 146 }
567
568 std::size_t
569 2 epoll_scheduler::
570 run_one()
571 {
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 {
574 stop();
575 return 0;
576 }
577
578 2 thread_context_guard ctx(this);
579
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
580
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
581 2 }
582
583 std::size_t
584 14 epoll_scheduler::
585 wait_one(long usec)
586 {
587
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
588 {
589
1/1
✓ Branch 1 taken 5 times.
5 stop();
590 5 return 0;
591 }
592
593 9 thread_context_guard ctx(this);
594
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
595
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
596 9 }
597
598 std::size_t
599 2 epoll_scheduler::
600 poll()
601 {
602
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 {
604
1/1
✓ Branch 1 taken 1 time.
1 stop();
605 1 return 0;
606 }
607
608 1 thread_context_guard ctx(this);
609
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
610
611 1 std::size_t n = 0;
612 for (;;)
613 {
614
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
615 1 break;
616
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
617 2 ++n;
618
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
619
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
620 }
621 1 return n;
622 1 }
623
624 std::size_t
625 4 epoll_scheduler::
626 poll_one()
627 {
628
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
629 {
630
1/1
✓ Branch 1 taken 2 times.
2 stop();
631 2 return 0;
632 }
633
634 2 thread_context_guard ctx(this);
635
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
636
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
637 2 }
638
639 void
640 5636 epoll_scheduler::
641 register_descriptor(int fd, descriptor_state* desc) const
642 {
643 5636 epoll_event ev{};
644 5636 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
645 5636 ev.data.ptr = desc;
646
647
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5636 times.
5636 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
648 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
649
650 5636 desc->registered_events = ev.events;
651 5636 desc->fd = fd;
652 5636 desc->scheduler_ = this;
653
654
1/1
✓ Branch 1 taken 5636 times.
5636 std::lock_guard lock(desc->mutex);
655 5636 desc->read_ready = false;
656 5636 desc->write_ready = false;
657 5636 }
658
659 void
660 5636 epoll_scheduler::
661 deregister_descriptor(int fd) const
662 {
663 5636 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
664 5636 }
665
666 void
667 5739 epoll_scheduler::
668 work_started() const noexcept
669 {
670 5739 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
671 5739 }
672
673 void
674 10476 epoll_scheduler::
675 work_finished() const noexcept
676 {
677
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 10328 times.
20952 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
678 {
679 // Last work item completed - wake all threads so they can exit.
680 // signal_all() wakes threads waiting on the condvar.
681 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
682 // Both are needed because they target different blocking mechanisms.
683 148 std::unique_lock lock(mutex_);
684 148 signal_all(lock);
685
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 148 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
148 if (task_running_ && !task_interrupted_)
686 {
687 task_interrupted_ = true;
688 lock.unlock();
689 interrupt_reactor();
690 }
691 148 }
692 10476 }
693
694 void
695 76429 epoll_scheduler::
696 compensating_work_started() const noexcept
697 {
698 76429 auto* ctx = find_context(this);
699
1/2
✓ Branch 0 taken 76429 times.
✗ Branch 1 not taken.
76429 if (ctx)
700 76429 ++ctx->private_outstanding_work;
701 76429 }
702
703 void
704 epoll_scheduler::
705 drain_thread_queue(op_queue& queue, long count) const
706 {
707 // Note: outstanding_work_ was already incremented when posting
708 std::unique_lock lock(mutex_);
709 completed_ops_.splice(queue);
710 if (count > 0)
711 maybe_unlock_and_signal_one(lock);
712 }
713
714 void
715 5613 epoll_scheduler::
716 post_deferred_completions(op_queue& ops) const
717 {
718
1/2
✓ Branch 1 taken 5613 times.
✗ Branch 2 not taken.
5613 if (ops.empty())
719 5613 return;
720
721 // Fast path: if on scheduler thread, use private queue
722 if (auto* ctx = find_context(this))
723 {
724 ctx->private_queue.splice(ops);
725 return;
726 }
727
728 // Slow path: add to global queue and wake a thread
729 std::unique_lock lock(mutex_);
730 completed_ops_.splice(ops);
731 wake_one_thread_and_unlock(lock);
732 }
733
734 void
735 235 epoll_scheduler::
736 interrupt_reactor() const
737 {
738 // Only write if not already armed to avoid redundant writes
739 235 bool expected = false;
740
2/2
✓ Branch 1 taken 221 times.
✓ Branch 2 taken 14 times.
235 if (eventfd_armed_.compare_exchange_strong(expected, true,
741 std::memory_order_release, std::memory_order_relaxed))
742 {
743 221 std::uint64_t val = 1;
744
1/1
✓ Branch 1 taken 221 times.
221 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
745 }
746 235 }
747
748 void
749 357 epoll_scheduler::
750 signal_all(std::unique_lock<std::mutex>&) const
751 {
752 357 state_ |= 1;
753 357 cond_.notify_all();
754 357 }
755
756 bool
757 52994 epoll_scheduler::
758 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
759 {
760 52994 state_ |= 1;
761
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 52994 times.
52994 if (state_ > 1)
762 {
763 lock.unlock();
764 cond_.notify_one();
765 return true;
766 }
767 52994 return false;
768 }
769
770 void
771 326501 epoll_scheduler::
772 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
773 {
774 326501 state_ |= 1;
775 326501 bool have_waiters = state_ > 1;
776 326501 lock.unlock();
777
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 326501 times.
326501 if (have_waiters)
778 cond_.notify_one();
779 326501 }
780
781 void
782 epoll_scheduler::
783 clear_signal() const
784 {
785 state_ &= ~std::size_t(1);
786 }
787
788 void
789 epoll_scheduler::
790 wait_for_signal(std::unique_lock<std::mutex>& lock) const
791 {
792 while ((state_ & 1) == 0)
793 {
794 state_ += 2;
795 cond_.wait(lock);
796 state_ -= 2;
797 }
798 }
799
800 void
801 epoll_scheduler::
802 wait_for_signal_for(
803 std::unique_lock<std::mutex>& lock,
804 long timeout_us) const
805 {
806 if ((state_ & 1) == 0)
807 {
808 state_ += 2;
809 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
810 state_ -= 2;
811 }
812 }
813
814 void
815 1649 epoll_scheduler::
816 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
817 {
818
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
819 return;
820
821
3/4
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 1623 times.
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
1649 if (task_running_ && !task_interrupted_)
822 {
823 26 task_interrupted_ = true;
824 26 lock.unlock();
825 26 interrupt_reactor();
826 }
827 else
828 {
829 1623 lock.unlock();
830 }
831 }
832
833 /** RAII guard for handler execution work accounting.
834
835 Handler consumes 1 work item, may produce N new items via fast-path posts.
836 Net change = N - 1:
837 - If N > 1: add (N-1) to global (more work produced than consumed)
838 - If N == 1: net zero, do nothing
839 - If N < 1: call work_finished() (work consumed, may trigger stop)
840
841 Also drains private queue to global for other threads to process.
842 */
843 struct work_cleanup
844 {
845 epoll_scheduler const* scheduler;
846 std::unique_lock<std::mutex>* lock;
847 scheduler_context* ctx;
848
849 242629 ~work_cleanup()
850 {
851
1/2
✓ Branch 0 taken 242629 times.
✗ Branch 1 not taken.
242629 if (ctx)
852 {
853 242629 long produced = ctx->private_outstanding_work;
854
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 242582 times.
242629 if (produced > 1)
855 47 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
856
2/2
✓ Branch 0 taken 10318 times.
✓ Branch 1 taken 232264 times.
242582 else if (produced < 1)
857 10318 scheduler->work_finished();
858 // produced == 1: net zero, handler consumed what it produced
859 242629 ctx->private_outstanding_work = 0;
860
861
2/2
✓ Branch 1 taken 155882 times.
✓ Branch 2 taken 86747 times.
242629 if (!ctx->private_queue.empty())
862 {
863 155882 lock->lock();
864 155882 scheduler->completed_ops_.splice(ctx->private_queue);
865 }
866 }
867 else
868 {
869 // No thread context - slow-path op was already counted globally
870 scheduler->work_finished();
871 }
872 242629 }
873 };
874
875 /** RAII guard for reactor work accounting.
876
877 Reactor only produces work via timer/signal callbacks posting handlers.
878 Unlike handler execution which consumes 1, the reactor consumes nothing.
879 All produced work must be flushed to global counter.
880 */
881 struct task_cleanup
882 {
883 epoll_scheduler const* scheduler;
884 scheduler_context* ctx;
885
886 89683 ~task_cleanup()
887 {
888
3/4
✓ Branch 0 taken 89683 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3007 times.
✓ Branch 3 taken 86676 times.
89683 if (ctx && ctx->private_outstanding_work > 0)
889 {
890 3007 scheduler->outstanding_work_.fetch_add(
891 3007 ctx->private_outstanding_work, std::memory_order_relaxed);
892 3007 ctx->private_outstanding_work = 0;
893 }
894 89683 }
895 };
896
897 void
898 6032 epoll_scheduler::
899 update_timerfd() const
900 {
901 6032 auto nearest = timer_svc_->nearest_expiry();
902
903 6032 itimerspec ts{};
904 6032 int flags = 0;
905
906
3/3
✓ Branch 2 taken 6032 times.
✓ Branch 4 taken 5989 times.
✓ Branch 5 taken 43 times.
6032 if (nearest == timer_service::time_point::max())
907 {
908 // No timers - disarm by setting to 0 (relative)
909 }
910 else
911 {
912 5989 auto now = std::chrono::steady_clock::now();
913
3/3
✓ Branch 1 taken 5989 times.
✓ Branch 4 taken 37 times.
✓ Branch 5 taken 5952 times.
5989 if (nearest <= now)
914 {
915 // Use 1ns instead of 0 - zero disarms the timerfd
916 37 ts.it_value.tv_nsec = 1;
917 }
918 else
919 {
920 5952 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
921
1/1
✓ Branch 1 taken 5952 times.
11904 nearest - now).count();
922 5952 ts.it_value.tv_sec = nsec / 1000000000;
923 5952 ts.it_value.tv_nsec = nsec % 1000000000;
924 // Ensure non-zero to avoid disarming if duration rounds to 0
925
3/4
✓ Branch 0 taken 5940 times.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5940 times.
5952 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
926 ts.it_value.tv_nsec = 1;
927 }
928 }
929
930
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6032 times.
6032 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
931 detail::throw_system_error(make_err(errno), "timerfd_settime");
932 6032 }
933
934 void
935 89683 epoll_scheduler::
936 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
937 {
938
2/2
✓ Branch 0 taken 83872 times.
✓ Branch 1 taken 5811 times.
89683 int timeout_ms = task_interrupted_ ? 0 : -1;
939
940
2/2
✓ Branch 1 taken 5811 times.
✓ Branch 2 taken 83872 times.
89683 if (lock.owns_lock())
941
1/1
✓ Branch 1 taken 5811 times.
5811 lock.unlock();
942
943 // Flush private work count when reactor completes
944 89683 task_cleanup on_exit{this, ctx};
945 (void)on_exit;
946
947 // Event loop runs without mutex held
948
949 epoll_event events[128];
950
1/1
✓ Branch 1 taken 89683 times.
89683 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
951 89683 int saved_errno = errno;
952
953
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 89683 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
89683 if (nfds < 0 && saved_errno != EINTR)
954 detail::throw_system_error(make_err(saved_errno), "epoll_wait");
955
956 89683 bool check_timers = false;
957 89683 op_queue local_ops;
958 89683 int completions_queued = 0;
959
960 // Process events without holding the mutex
961
2/2
✓ Branch 0 taken 85084 times.
✓ Branch 1 taken 89683 times.
174767 for (int i = 0; i < nfds; ++i)
962 {
963
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 85052 times.
85084 if (events[i].data.ptr == nullptr)
964 {
965 std::uint64_t val;
966
1/1
✓ Branch 1 taken 32 times.
32 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
967 32 eventfd_armed_.store(false, std::memory_order_relaxed);
968 32 continue;
969 32 }
970
971
2/2
✓ Branch 0 taken 3010 times.
✓ Branch 1 taken 82042 times.
85052 if (events[i].data.ptr == &timer_fd_)
972 {
973 std::uint64_t expirations;
974
1/1
✓ Branch 1 taken 3010 times.
3010 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
975 3010 check_timers = true;
976 3010 continue;
977 3010 }
978
979 // Deferred I/O: just set ready events and enqueue descriptor
980 // No per-descriptor mutex locking in reactor hot path!
981 82042 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
982 82042 desc->add_ready_events(events[i].events);
983
984 // Only enqueue if not already enqueued
985 82042 bool expected = false;
986
1/2
✓ Branch 1 taken 82042 times.
✗ Branch 2 not taken.
82042 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
987 std::memory_order_release, std::memory_order_relaxed))
988 {
989 82042 local_ops.push(desc);
990 82042 ++completions_queued;
991 }
992 }
993
994 // Process timers only when timerfd fires
995
2/2
✓ Branch 0 taken 3010 times.
✓ Branch 1 taken 86673 times.
89683 if (check_timers)
996 {
997
1/1
✓ Branch 1 taken 3010 times.
3010 timer_svc_->process_expired();
998
1/1
✓ Branch 1 taken 3010 times.
3010 update_timerfd();
999 }
1000
1001 // --- Acquire mutex only for queue operations ---
1002
1/1
✓ Branch 1 taken 89683 times.
89683 lock.lock();
1003
1004
2/2
✓ Branch 1 taken 48339 times.
✓ Branch 2 taken 41344 times.
89683 if (!local_ops.empty())
1005 48339 completed_ops_.splice(local_ops);
1006
1007 // Drain private queue to global (work count handled by task_cleanup)
1008
5/6
✓ Branch 0 taken 89683 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3007 times.
✓ Branch 4 taken 86676 times.
✓ Branch 5 taken 3007 times.
✓ Branch 6 taken 86676 times.
89683 if (ctx && !ctx->private_queue.empty())
1009 {
1010 3007 completions_queued += ctx->private_outstanding_work;
1011 3007 completed_ops_.splice(ctx->private_queue);
1012 }
1013
1014 // Signal and wake one waiter if work is queued
1015
2/2
✓ Branch 0 taken 51345 times.
✓ Branch 1 taken 38338 times.
89683 if (completions_queued > 0)
1016 {
1017
2/3
✓ Branch 1 taken 51345 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 51345 times.
51345 if (maybe_unlock_and_signal_one(lock))
1018 lock.lock();
1019 }
1020 89683 }
1021
1022 std::size_t
1023 242776 epoll_scheduler::
1024 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1025 {
1026 for (;;)
1027 {
1028
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 332457 times.
332459 if (stopped_)
1029 2 return 0;
1030
1031 332457 scheduler_op* op = completed_ops_.pop();
1032
1033 // Handle reactor sentinel - time to poll for I/O
1034
2/2
✓ Branch 0 taken 89828 times.
✓ Branch 1 taken 242629 times.
332457 if (op == &task_op_)
1035 {
1036
3/4
✓ Branch 1 taken 5956 times.
✓ Branch 2 taken 83872 times.
✓ Branch 3 taken 5956 times.
✗ Branch 4 not taken.
95784 bool more_handlers = !completed_ops_.empty() ||
1037
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5956 times.
5956 (ctx && !ctx->private_queue.empty());
1038
1039 // Nothing to run the reactor for: no pending work to wait on,
1040 // or caller requested a non-blocking poll
1041
4/4
✓ Branch 0 taken 5956 times.
✓ Branch 1 taken 83872 times.
✓ Branch 2 taken 145 times.
✓ Branch 3 taken 89683 times.
95784 if (!more_handlers &&
1042
3/4
✓ Branch 1 taken 5811 times.
✓ Branch 2 taken 145 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5811 times.
11912 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 timeout_us == 0))
1044 {
1045 145 completed_ops_.push(&task_op_);
1046 145 return 0;
1047 }
1048
1049
3/4
✓ Branch 0 taken 5811 times.
✓ Branch 1 taken 83872 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5811 times.
89683 task_interrupted_ = more_handlers || timeout_us == 0;
1050 89683 task_running_ = true;
1051
1052
2/2
✓ Branch 0 taken 83872 times.
✓ Branch 1 taken 5811 times.
89683 if (more_handlers)
1053 83872 unlock_and_signal_one(lock);
1054
1055 89683 run_task(lock, ctx);
1056
1057 89683 task_running_ = false;
1058 89683 completed_ops_.push(&task_op_);
1059 89683 continue;
1060 89683 }
1061
1062 // Handle operation
1063
1/2
✓ Branch 0 taken 242629 times.
✗ Branch 1 not taken.
242629 if (op != nullptr)
1064 {
1065
1/2
✓ Branch 1 taken 242629 times.
✗ Branch 2 not taken.
242629 if (!completed_ops_.empty())
1066
1/1
✓ Branch 1 taken 242629 times.
242629 unlock_and_signal_one(lock);
1067 else
1068 lock.unlock();
1069
1070 242629 work_cleanup on_exit{this, &lock, ctx};
1071 (void)on_exit;
1072
1073
1/1
✓ Branch 1 taken 242629 times.
242629 (*op)();
1074 242629 return 1;
1075 242629 }
1076
1077 // No work from global queue - try private queue before blocking
1078 if (drain_private_queue(ctx, outstanding_work_, completed_ops_))
1079 continue;
1080
1081 // No pending work to wait on, or caller requested non-blocking poll
1082 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1083 timeout_us == 0)
1084 return 0;
1085
1086 clear_signal();
1087 if (timeout_us < 0)
1088 wait_for_signal(lock);
1089 else
1090 wait_for_signal_for(lock, timeout_us);
1091 89683 }
1092 }
1093
1094 } // namespace boost::corosio::detail
1095
1096 #endif
1097