libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

72.9% Lines (320/439) 94.4% Functions (34/36) 54.9% Branches (124/226)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 void
34 104 epoll_op::canceller::
35 operator()() const noexcept
36 {
37 104 op->cancel();
38 104 }
39
40 void
41 epoll_connect_op::
42 cancel() noexcept
43 {
44 if (socket_impl_)
45 socket_impl_->cancel_single_op(*this);
46 else
47 request_cancel();
48 }
49
50 void
51 98 epoll_read_op::
52 cancel() noexcept
53 {
54
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
55 98 socket_impl_->cancel_single_op(*this);
56 else
57 request_cancel();
58 98 }
59
60 void
61 epoll_write_op::
62 cancel() noexcept
63 {
64 if (socket_impl_)
65 socket_impl_->cancel_single_op(*this);
66 else
67 request_cancel();
68 }
69
70 void
71 2782 epoll_connect_op::
72 operator()()
73 {
74 2782 stop_cb.reset();
75
76
3/4
✓ Branch 0 taken 2781 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 2781 times.
✗ Branch 4 not taken.
2782 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77
78 // Cache endpoints on successful connect
79
3/4
✓ Branch 0 taken 2781 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 2781 times.
✗ Branch 3 not taken.
2782 if (success && socket_impl_)
80 {
81 // Query local endpoint via getsockname (may fail, but remote is always known)
82 2781 endpoint local_ep;
83 2781 sockaddr_in local_addr{};
84 2781 socklen_t local_len = sizeof(local_addr);
85
1/2
✓ Branch 1 taken 2781 times.
✗ Branch 2 not taken.
2781 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 2781 local_ep = from_sockaddr_in(local_addr);
87 // Always cache remote endpoint; local may be default if getsockname failed
88 2781 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 }
90
91
1/2
✓ Branch 0 taken 2782 times.
✗ Branch 1 not taken.
2782 if (ec_out)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2782 times.
2782 if (cancelled.load(std::memory_order_acquire))
94 *ec_out = capy::error::canceled;
95
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2781 times.
2782 else if (errn != 0)
96 1 *ec_out = make_err(errn);
97 else
98 2781 *ec_out = {};
99 }
100
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2782 times.
2782 if (bytes_out)
102 *bytes_out = bytes_transferred;
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 2782 capy::executor_ref saved_ex( std::move( ex ) );
106 2782 capy::coro saved_h( std::move( h ) );
107 2782 auto prevent_premature_destruction = std::move(impl_ptr);
108
1/1
✓ Branch 1 taken 2782 times.
2782 resume_coro(saved_ex, saved_h);
109 2782 }
110
111 5574 epoll_socket_impl::
112 5574 epoll_socket_impl(epoll_socket_service& svc) noexcept
113 5574 : svc_(svc)
114 {
115 5574 }
116
117 5574 epoll_socket_impl::
118 ~epoll_socket_impl() = default;
119
120 void
121 5574 epoll_socket_impl::
122 release()
123 {
124 5574 close_socket();
125 5574 svc_.destroy_impl(*this);
126 5574 }
127
128 std::coroutine_handle<>
129 2782 epoll_socket_impl::
130 connect(
131 std::coroutine_handle<> h,
132 capy::executor_ref ex,
133 endpoint ep,
134 std::stop_token token,
135 std::error_code* ec)
136 {
137 2782 auto& op = conn_;
138 2782 op.reset();
139 2782 op.h = h;
140 2782 op.ex = ex;
141 2782 op.ec_out = ec;
142 2782 op.fd = fd_;
143 2782 op.target_endpoint = ep; // Store target for endpoint caching
144 2782 op.start(token, this);
145
146 2782 sockaddr_in addr = detail::to_sockaddr_in(ep);
147
1/1
✓ Branch 1 taken 2782 times.
2782 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2782 times.
2782 if (result == 0)
150 {
151 // Sync success - cache endpoints immediately
152 // Remote is always known; local may fail but we still cache remote
153 sockaddr_in local_addr{};
154 socklen_t local_len = sizeof(local_addr);
155 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 remote_endpoint_ = ep;
158
159 op.complete(0, 0);
160 op.impl_ptr = shared_from_this();
161 svc_.post(&op);
162 // completion is always posted to scheduler queue, never inline.
163 return std::noop_coroutine();
164 }
165
166
1/2
✓ Branch 0 taken 2782 times.
✗ Branch 1 not taken.
2782 if (errno == EINPROGRESS)
167 {
168 2782 svc_.work_started();
169
1/1
✓ Branch 1 taken 2782 times.
2782 op.impl_ptr = shared_from_this();
170
171 2782 bool perform_now = false;
172 {
173
1/1
✓ Branch 1 taken 2782 times.
2782 std::lock_guard lock(desc_state_.mutex);
174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2782 times.
2782 if (desc_state_.write_ready)
175 {
176 desc_state_.write_ready = false;
177 perform_now = true;
178 }
179 else
180 {
181 2782 desc_state_.connect_op = &op;
182 }
183 2782 }
184
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2782 times.
2782 if (perform_now)
186 {
187 op.perform_io();
188 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
189 {
190 op.errn = 0;
191 std::lock_guard lock(desc_state_.mutex);
192 desc_state_.connect_op = &op;
193 }
194 else
195 {
196 svc_.post(&op);
197 svc_.work_finished();
198 }
199 return std::noop_coroutine();
200 }
201
202
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2782 times.
2782 if (op.cancelled.load(std::memory_order_acquire))
203 {
204 epoll_op* claimed = nullptr;
205 {
206 std::lock_guard lock(desc_state_.mutex);
207 if (desc_state_.connect_op == &op)
208 claimed = std::exchange(desc_state_.connect_op, nullptr);
209 }
210 if (claimed)
211 {
212 svc_.post(claimed);
213 svc_.work_finished();
214 }
215 }
216 // completion is always posted to scheduler queue, never inline.
217 2782 return std::noop_coroutine();
218 }
219
220 op.complete(errno, 0);
221 op.impl_ptr = shared_from_this();
222 svc_.post(&op);
223 // completion is always posted to scheduler queue, never inline.
224 return std::noop_coroutine();
225 }
226
227 void
228 76522 epoll_socket_impl::
229 do_read_io()
230 {
231 76522 auto& op = rd_;
232
233 76522 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
234
235
2/2
✓ Branch 0 taken 76348 times.
✓ Branch 1 taken 174 times.
76522 if (n > 0)
236 {
237 {
238
1/1
✓ Branch 1 taken 76348 times.
76348 std::lock_guard lock(desc_state_.mutex);
239 76348 desc_state_.read_ready = false;
240 76348 }
241 76348 op.complete(0, static_cast<std::size_t>(n));
242 76348 svc_.post(&op);
243 76348 return;
244 }
245
246
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 169 times.
174 if (n == 0)
247 {
248 {
249
1/1
✓ Branch 1 taken 5 times.
5 std::lock_guard lock(desc_state_.mutex);
250 5 desc_state_.read_ready = false;
251 5 }
252 5 op.complete(0, 0);
253 5 svc_.post(&op);
254 5 return;
255 }
256
257
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 169 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
169 if (errno == EAGAIN || errno == EWOULDBLOCK)
258 {
259 169 svc_.work_started();
260
261 169 bool perform_now = false;
262 {
263
1/1
✓ Branch 1 taken 169 times.
169 std::lock_guard lock(desc_state_.mutex);
264
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 123 times.
169 if (desc_state_.read_ready)
265 {
266 46 desc_state_.read_ready = false;
267 46 perform_now = true;
268 }
269 else
270 {
271 123 desc_state_.read_op = &op;
272 }
273 169 }
274
275
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 123 times.
169 if (perform_now)
276 {
277 46 op.perform_io();
278
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
46 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
279 {
280 46 op.errn = 0;
281
1/1
✓ Branch 1 taken 46 times.
46 std::lock_guard lock(desc_state_.mutex);
282 46 desc_state_.read_op = &op;
283 46 }
284 else
285 {
286 svc_.post(&op);
287 svc_.work_finished();
288 }
289 46 return;
290 }
291
292
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 123 times.
123 if (op.cancelled.load(std::memory_order_acquire))
293 {
294 epoll_op* claimed = nullptr;
295 {
296 std::lock_guard lock(desc_state_.mutex);
297 if (desc_state_.read_op == &op)
298 claimed = std::exchange(desc_state_.read_op, nullptr);
299 }
300 if (claimed)
301 {
302 svc_.post(claimed);
303 svc_.work_finished();
304 }
305 }
306 123 return;
307 }
308
309 op.complete(errno, 0);
310 svc_.post(&op);
311 }
312
313 void
314 76401 epoll_socket_impl::
315 do_write_io()
316 {
317 76401 auto& op = wr_;
318
319 76401 msghdr msg{};
320 76401 msg.msg_iov = op.iovecs;
321 76401 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
322
323
1/1
✓ Branch 1 taken 76401 times.
76401 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
324
325
2/2
✓ Branch 0 taken 76400 times.
✓ Branch 1 taken 1 time.
76401 if (n > 0)
326 {
327 {
328
1/1
✓ Branch 1 taken 76400 times.
76400 std::lock_guard lock(desc_state_.mutex);
329 76400 desc_state_.write_ready = false;
330 76400 }
331 76400 op.complete(0, static_cast<std::size_t>(n));
332
1/1
✓ Branch 1 taken 76400 times.
76400 svc_.post(&op);
333 76400 return;
334 }
335
336
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
337 {
338 svc_.work_started();
339
340 bool perform_now = false;
341 {
342 std::lock_guard lock(desc_state_.mutex);
343 if (desc_state_.write_ready)
344 {
345 desc_state_.write_ready = false;
346 perform_now = true;
347 }
348 else
349 {
350 desc_state_.write_op = &op;
351 }
352 }
353
354 if (perform_now)
355 {
356 op.perform_io();
357 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
358 {
359 op.errn = 0;
360 std::lock_guard lock(desc_state_.mutex);
361 desc_state_.write_op = &op;
362 }
363 else
364 {
365 svc_.post(&op);
366 svc_.work_finished();
367 }
368 return;
369 }
370
371 if (op.cancelled.load(std::memory_order_acquire))
372 {
373 epoll_op* claimed = nullptr;
374 {
375 std::lock_guard lock(desc_state_.mutex);
376 if (desc_state_.write_op == &op)
377 claimed = std::exchange(desc_state_.write_op, nullptr);
378 }
379 if (claimed)
380 {
381 svc_.post(claimed);
382 svc_.work_finished();
383 }
384 }
385 return;
386 }
387
388
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
389
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
390 }
391
392 std::coroutine_handle<>
393 76523 epoll_socket_impl::
394 read_some(
395 std::coroutine_handle<> h,
396 capy::executor_ref ex,
397 io_buffer_param param,
398 std::stop_token token,
399 std::error_code* ec,
400 std::size_t* bytes_out)
401 {
402 76523 auto& op = rd_;
403 76523 op.reset();
404 76523 op.h = h;
405 76523 op.ex = ex;
406 76523 op.ec_out = ec;
407 76523 op.bytes_out = bytes_out;
408 76523 op.fd = fd_;
409 76523 op.start(token, this);
410
1/1
✓ Branch 1 taken 76523 times.
76523 op.impl_ptr = shared_from_this();
411
412 // Must prepare buffers before initiator runs
413 76523 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
414 76523 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
415
416
6/8
✓ Branch 0 taken 76522 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 76522 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 76522 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 76522 times.
76523 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
417 {
418 1 op.empty_buffer_read = true;
419 1 op.complete(0, 0);
420
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
421 1 return std::noop_coroutine();
422 }
423
424
2/2
✓ Branch 0 taken 76522 times.
✓ Branch 1 taken 76522 times.
153044 for (int i = 0; i < op.iovec_count; ++i)
425 {
426 76522 op.iovecs[i].iov_base = bufs[i].data();
427 76522 op.iovecs[i].iov_len = bufs[i].size();
428 }
429
430 // Symmetric transfer ensures caller is suspended before I/O starts
431
1/1
✓ Branch 1 taken 76522 times.
76522 return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
432 }
433
434 std::coroutine_handle<>
435 76402 epoll_socket_impl::
436 write_some(
437 std::coroutine_handle<> h,
438 capy::executor_ref ex,
439 io_buffer_param param,
440 std::stop_token token,
441 std::error_code* ec,
442 std::size_t* bytes_out)
443 {
444 76402 auto& op = wr_;
445 76402 op.reset();
446 76402 op.h = h;
447 76402 op.ex = ex;
448 76402 op.ec_out = ec;
449 76402 op.bytes_out = bytes_out;
450 76402 op.fd = fd_;
451 76402 op.start(token, this);
452
1/1
✓ Branch 1 taken 76402 times.
76402 op.impl_ptr = shared_from_this();
453
454 // Must prepare buffers before initiator runs
455 76402 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
456 76402 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
457
458
6/8
✓ Branch 0 taken 76401 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 76401 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 76401 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 76401 times.
76402 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
459 {
460 1 op.complete(0, 0);
461
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
462 1 return std::noop_coroutine();
463 }
464
465
2/2
✓ Branch 0 taken 76401 times.
✓ Branch 1 taken 76401 times.
152802 for (int i = 0; i < op.iovec_count; ++i)
466 {
467 76401 op.iovecs[i].iov_base = bufs[i].data();
468 76401 op.iovecs[i].iov_len = bufs[i].size();
469 }
470
471 // Symmetric transfer ensures caller is suspended before I/O starts
472
1/1
✓ Branch 1 taken 76401 times.
76401 return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
473 }
474
475 std::error_code
476 3 epoll_socket_impl::
477 shutdown(tcp_socket::shutdown_type what) noexcept
478 {
479 int how;
480
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
481 {
482 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
483 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
484 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
485 default:
486 return make_err(EINVAL);
487 }
488
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
489 return make_err(errno);
490 3 return {};
491 }
492
493 std::error_code
494 5 epoll_socket_impl::
495 set_no_delay(bool value) noexcept
496 {
497
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
498
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
499 return make_err(errno);
500 5 return {};
501 }
502
503 bool
504 5 epoll_socket_impl::
505 no_delay(std::error_code& ec) const noexcept
506 {
507 5 int flag = 0;
508 5 socklen_t len = sizeof(flag);
509
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
510 {
511 ec = make_err(errno);
512 return false;
513 }
514 5 ec = {};
515 5 return flag != 0;
516 }
517
518 std::error_code
519 4 epoll_socket_impl::
520 set_keep_alive(bool value) noexcept
521 {
522
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
523
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
524 return make_err(errno);
525 4 return {};
526 }
527
528 bool
529 4 epoll_socket_impl::
530 keep_alive(std::error_code& ec) const noexcept
531 {
532 4 int flag = 0;
533 4 socklen_t len = sizeof(flag);
534
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
535 {
536 ec = make_err(errno);
537 return false;
538 }
539 4 ec = {};
540 4 return flag != 0;
541 }
542
543 std::error_code
544 1 epoll_socket_impl::
545 set_receive_buffer_size(int size) noexcept
546 {
547
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
548 return make_err(errno);
549 1 return {};
550 }
551
552 int
553 3 epoll_socket_impl::
554 receive_buffer_size(std::error_code& ec) const noexcept
555 {
556 3 int size = 0;
557 3 socklen_t len = sizeof(size);
558
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
559 {
560 ec = make_err(errno);
561 return 0;
562 }
563 3 ec = {};
564 3 return size;
565 }
566
567 std::error_code
568 1 epoll_socket_impl::
569 set_send_buffer_size(int size) noexcept
570 {
571
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
572 return make_err(errno);
573 1 return {};
574 }
575
576 int
577 3 epoll_socket_impl::
578 send_buffer_size(std::error_code& ec) const noexcept
579 {
580 3 int size = 0;
581 3 socklen_t len = sizeof(size);
582
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
583 {
584 ec = make_err(errno);
585 return 0;
586 }
587 3 ec = {};
588 3 return size;
589 }
590
591 std::error_code
592 4 epoll_socket_impl::
593 set_linger(bool enabled, int timeout) noexcept
594 {
595
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
596 1 return make_err(EINVAL);
597 struct ::linger lg;
598
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
599 3 lg.l_linger = timeout;
600
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
601 return make_err(errno);
602 3 return {};
603 }
604
605 tcp_socket::linger_options
606 3 epoll_socket_impl::
607 linger(std::error_code& ec) const noexcept
608 {
609 3 struct ::linger lg{};
610 3 socklen_t len = sizeof(lg);
611
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
612 {
613 ec = make_err(errno);
614 return {};
615 }
616 3 ec = {};
617 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
618 }
619
620 void
621 8463 epoll_socket_impl::
622 cancel() noexcept
623 {
624 8463 std::shared_ptr<epoll_socket_impl> self;
625 try {
626
1/1
✓ Branch 1 taken 8463 times.
8463 self = shared_from_this();
627 } catch (const std::bad_weak_ptr&) {
628 return;
629 }
630
631 8463 conn_.request_cancel();
632 8463 rd_.request_cancel();
633 8463 wr_.request_cancel();
634
635 8463 epoll_op* conn_claimed = nullptr;
636 8463 epoll_op* rd_claimed = nullptr;
637 8463 epoll_op* wr_claimed = nullptr;
638 {
639 8463 std::lock_guard lock(desc_state_.mutex);
640
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8463 times.
8463 if (desc_state_.connect_op == &conn_)
641 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
642
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 8412 times.
8463 if (desc_state_.read_op == &rd_)
643 51 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
644
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8463 times.
8463 if (desc_state_.write_op == &wr_)
645 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
646 8463 }
647
648
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8463 times.
8463 if (conn_claimed)
649 {
650 conn_.impl_ptr = self;
651 svc_.post(&conn_);
652 svc_.work_finished();
653 }
654
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 8412 times.
8463 if (rd_claimed)
655 {
656 51 rd_.impl_ptr = self;
657 51 svc_.post(&rd_);
658 51 svc_.work_finished();
659 }
660
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8463 times.
8463 if (wr_claimed)
661 {
662 wr_.impl_ptr = self;
663 svc_.post(&wr_);
664 svc_.work_finished();
665 }
666 8463 }
667
668 void
669 98 epoll_socket_impl::
670 cancel_single_op(epoll_op& op) noexcept
671 {
672 98 op.request_cancel();
673
674 98 epoll_op** desc_op_ptr = nullptr;
675
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
676
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
677 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
678
679
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
680 {
681 98 epoll_op* claimed = nullptr;
682 {
683 98 std::lock_guard lock(desc_state_.mutex);
684
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (*desc_op_ptr == &op)
685 66 claimed = std::exchange(*desc_op_ptr, nullptr);
686 98 }
687
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (claimed)
688 {
689 try {
690
1/1
✓ Branch 1 taken 66 times.
66 op.impl_ptr = shared_from_this();
691 } catch (const std::bad_weak_ptr&) {}
692 66 svc_.post(&op);
693 66 svc_.work_finished();
694 }
695 }
696 98 }
697
698 void
699 8367 epoll_socket_impl::
700 close_socket() noexcept
701 {
702 8367 cancel();
703
704 // Keep impl alive if descriptor_state is queued in the scheduler.
705 // Without this, destroy_impl() drops the last shared_ptr while
706 // the queued descriptor_state node would become dangling.
707
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8363 times.
8367 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708 {
709 try {
710
1/1
✓ Branch 1 taken 4 times.
4 desc_state_.impl_ref_ = shared_from_this();
711 } catch (std::bad_weak_ptr const&) {}
712 }
713
714
2/2
✓ Branch 0 taken 5574 times.
✓ Branch 1 taken 2793 times.
8367 if (fd_ >= 0)
715 {
716
1/2
✓ Branch 0 taken 5574 times.
✗ Branch 1 not taken.
5574 if (desc_state_.registered_events != 0)
717 5574 svc_.scheduler().deregister_descriptor(fd_);
718 5574 ::close(fd_);
719 5574 fd_ = -1;
720 }
721
722 8367 desc_state_.fd = -1;
723 {
724 8367 std::lock_guard lock(desc_state_.mutex);
725 8367 desc_state_.read_op = nullptr;
726 8367 desc_state_.write_op = nullptr;
727 8367 desc_state_.connect_op = nullptr;
728 8367 desc_state_.read_ready = false;
729 8367 desc_state_.write_ready = false;
730 8367 }
731 8367 desc_state_.registered_events = 0;
732
733 8367 local_endpoint_ = endpoint{};
734 8367 remote_endpoint_ = endpoint{};
735 8367 }
736
737 189 epoll_socket_service::
738 189 epoll_socket_service(capy::execution_context& ctx)
739
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
740 {
741 189 }
742
743 378 epoll_socket_service::
744 189 ~epoll_socket_service()
745 {
746 378 }
747
748 void
749 189 epoll_socket_service::
750 shutdown()
751 {
752
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
753
754
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
755 impl->close_socket();
756
757 189 state_->socket_ptrs_.clear();
758 189 }
759
760 tcp_socket::socket_impl&
761 5574 epoll_socket_service::
762 create_impl()
763 {
764
1/1
✓ Branch 1 taken 5574 times.
5574 auto impl = std::make_shared<epoll_socket_impl>(*this);
765 5574 auto* raw = impl.get();
766
767 {
768
1/1
✓ Branch 2 taken 5574 times.
5574 std::lock_guard lock(state_->mutex_);
769 5574 state_->socket_list_.push_back(raw);
770
1/1
✓ Branch 3 taken 5574 times.
5574 state_->socket_ptrs_.emplace(raw, std::move(impl));
771 5574 }
772
773 5574 return *raw;
774 5574 }
775
776 void
777 5574 epoll_socket_service::
778 destroy_impl(tcp_socket::socket_impl& impl)
779 {
780 5574 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
781
1/1
✓ Branch 2 taken 5574 times.
5574 std::lock_guard lock(state_->mutex_);
782 5574 state_->socket_list_.remove(epoll_impl);
783
1/1
✓ Branch 2 taken 5574 times.
5574 state_->socket_ptrs_.erase(epoll_impl);
784 5574 }
785
786 std::error_code
787 2793 epoll_socket_service::
788 open_socket(tcp_socket::socket_impl& impl)
789 {
790 2793 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
791 2793 epoll_impl->close_socket();
792
793 2793 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
794
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2793 times.
2793 if (fd < 0)
795 return make_err(errno);
796
797 2793 epoll_impl->fd_ = fd;
798
799 // Register fd with epoll (edge-triggered mode)
800 2793 epoll_impl->desc_state_.fd = fd;
801 {
802
1/1
✓ Branch 1 taken 2793 times.
2793 std::lock_guard lock(epoll_impl->desc_state_.mutex);
803 2793 epoll_impl->desc_state_.read_op = nullptr;
804 2793 epoll_impl->desc_state_.write_op = nullptr;
805 2793 epoll_impl->desc_state_.connect_op = nullptr;
806 2793 }
807 2793 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
808
809 2793 return {};
810 }
811
812 void
813 152873 epoll_socket_service::
814 post(epoll_op* op)
815 {
816 152873 state_->sched_.post(op);
817 152873 }
818
819 void
820 2951 epoll_socket_service::
821 work_started() noexcept
822 {
823 2951 state_->sched_.work_started();
824 2951 }
825
826 void
827 117 epoll_socket_service::
828 work_finished() noexcept
829 {
830 117 state_->sched_.work_finished();
831 117 }
832
833 } // namespace boost::corosio::detail
834
835 #endif // BOOST_COROSIO_HAS_EPOLL
836