include/boost/corosio/native/detail/select/select_scheduler.hpp

77.7% Lines (278/358) 87.9% List of functions (29/33)
f(x) Functions (33)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_scheduler::task_op::operator()() :179 0 0.0% boost::corosio::detail::select_scheduler::task_op::destroy() :180 0 0.0% boost::corosio::detail::select::thread_context_guard::thread_context_guard(boost::corosio::detail::select_scheduler const*) :234 0 100.0% boost::corosio::detail::select::thread_context_guard::~thread_context_guard() :240 0 100.0% boost::corosio::detail::select::work_guard::~work_guard() :249 0 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int) :257 0 55.2% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::{lambda(void*)#1}::operator()(void*) const :299 0 100.0% boost::corosio::detail::select_scheduler::~select_scheduler() :313 0 100.0% boost::corosio::detail::select_scheduler::shutdown() :322 0 100.0% boost::corosio::detail::select_scheduler::post(std::__n4861::coroutine_handle<void>) const :344 0 100.0% boost::corosio::detail::select_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :350 0 100.0% boost::corosio::detail::select_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :352 0 100.0% boost::corosio::detail::select_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :354 0 100.0% boost::corosio::detail::select_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :361 0 100.0% boost::corosio::detail::select_scheduler::post(boost::corosio::detail::scheduler_op*) const :378 0 100.0% boost::corosio::detail::select_scheduler::running_in_this_thread() const :388 0 100.0% boost::corosio::detail::select_scheduler::stop() :397 0 100.0% boost::corosio::detail::select_scheduler::stopped() const :414 0 100.0% boost::corosio::detail::select_scheduler::restart() :420 0 100.0% boost::corosio::detail::select_scheduler::run() :426 0 76.9% boost::corosio::detail::select_scheduler::run_one() :447 0 0.0% boost::corosio::detail::select_scheduler::wait_one(long) :463 0 77.8% boost::corosio::detail::select_scheduler::poll() :479 0 76.9% boost::corosio::detail::select_scheduler::poll_one() :500 0 0.0% boost::corosio::detail::select_scheduler::register_fd(int, boost::corosio::detail::select_op*, int) const :516 0 92.9% boost::corosio::detail::select_scheduler::deregister_fd(int, int) const :541 0 82.4% boost::corosio::detail::select_scheduler::work_started() :573 0 100.0% boost::corosio::detail::select_scheduler::work_finished() :579 0 100.0% boost::corosio::detail::select_scheduler::interrupt_reactor() const :586 0 100.0% boost::corosio::detail::select_scheduler::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :593 0 80.0% boost::corosio::detail::select_scheduler::calculate_timeout(long) const :617 0 84.2% boost::corosio::detail::select_scheduler::run_reactor(std::unique_lock<std::mutex>&) :652 0 78.3% boost::corosio::detail::select_scheduler::do_one(long) :821 0 70.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/native_scheduler.hpp>
21 #include <boost/corosio/detail/scheduler_op.hpp>
22
23 #include <boost/corosio/native/detail/select/select_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28
29 #include <boost/corosio/detail/except.hpp>
30 #include <boost/corosio/detail/thread_local_ptr.hpp>
31
32 #include <sys/select.h>
33 #include <sys/socket.h>
34 #include <unistd.h>
35 #include <errno.h>
36 #include <fcntl.h>
37
38 #include <algorithm>
39 #include <atomic>
40 #include <chrono>
41 #include <condition_variable>
42 #include <cstddef>
43 #include <limits>
44 #include <mutex>
45 #include <unordered_map>
46
47 namespace boost::corosio::detail {
48
49 struct select_op;
50
51 /** POSIX scheduler using select() for I/O multiplexing.
52
53 This scheduler implements the scheduler interface using the POSIX select()
54 call for I/O event notification. It uses a single reactor model
55 where one thread runs select() while other threads wait on a condition
56 variable for handler work. This design provides:
57
58 - Handler parallelism: N posted handlers can execute on N threads
59 - No thundering herd: condition_variable wakes exactly one thread
60 - Portability: Works on all POSIX systems
61
62 The design mirrors epoll_scheduler for behavioral consistency:
63 - Same single-reactor thread coordination model
64 - Same work counting semantics
65 - Same timer integration pattern
66
67 Known Limitations:
68 - FD_SETSIZE (~1024) limits maximum concurrent connections
69 - O(n) scanning: rebuilds fd_sets each iteration
70 - Level-triggered only (no edge-triggered mode)
71
72 @par Thread Safety
73 All public member functions are thread-safe.
74 */
75 class BOOST_COROSIO_DECL select_scheduler final
76 : public native_scheduler
77 , public capy::execution_context::service
78 {
79 public:
80 using key_type = scheduler;
81
82 /** Construct the scheduler.
83
84 Creates a self-pipe for reactor interruption.
85
86 @param ctx Reference to the owning execution_context.
87 @param concurrency_hint Hint for expected thread count (unused).
88 */
89 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90
91 ~select_scheduler() override;
92
93 select_scheduler(select_scheduler const&) = delete;
94 select_scheduler& operator=(select_scheduler const&) = delete;
95
96 void shutdown() override;
97 void post(std::coroutine_handle<> h) const override;
98 void post(scheduler_op* h) const override;
99 bool running_in_this_thread() const noexcept override;
100 void stop() override;
101 bool stopped() const noexcept override;
102 void restart() override;
103 std::size_t run() override;
104 std::size_t run_one() override;
105 std::size_t wait_one(long usec) override;
106 std::size_t poll() override;
107 std::size_t poll_one() override;
108
109 /** Return the maximum file descriptor value supported.
110
111 Returns FD_SETSIZE - 1, the maximum fd value that can be
112 monitored by select(). Operations with fd >= FD_SETSIZE
113 will fail with EINVAL.
114
115 @return The maximum supported file descriptor value.
116 */
117 static constexpr int max_fd() noexcept
118 {
119 return FD_SETSIZE - 1;
120 }
121
122 /** Register a file descriptor for monitoring.
123
124 @param fd The file descriptor to register.
125 @param op The operation associated with this fd.
126 @param events Event mask: 1 = read, 2 = write, 3 = both.
127 */
128 void register_fd(int fd, select_op* op, int events) const;
129
130 /** Unregister a file descriptor from monitoring.
131
132 @param fd The file descriptor to unregister.
133 @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134 */
135 void deregister_fd(int fd, int events) const;
136
137 void work_started() noexcept override;
138 void work_finished() noexcept override;
139
140 // Event flags for register_fd/deregister_fd
141 static constexpr int event_read = 1;
142 static constexpr int event_write = 2;
143
144 private:
145 std::size_t do_one(long timeout_us);
146 void run_reactor(std::unique_lock<std::mutex>& lock);
147 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 void interrupt_reactor() const;
149 long calculate_timeout(long requested_timeout_us) const;
150
151 // Self-pipe for interrupting select()
152 int pipe_fds_[2]; // [0]=read, [1]=write
153
154 mutable std::mutex mutex_;
155 mutable std::condition_variable wakeup_event_;
156 mutable op_queue completed_ops_;
157 mutable std::atomic<long> outstanding_work_;
158 std::atomic<bool> stopped_;
159
160 // Per-fd state for tracking registered operations
161 struct fd_state
162 {
163 select_op* read_op = nullptr;
164 select_op* write_op = nullptr;
165 };
166 mutable std::unordered_map<int, fd_state> registered_fds_;
167 mutable int max_fd_ = -1;
168
169 // Single reactor thread coordination
170 mutable bool reactor_running_ = false;
171 mutable bool reactor_interrupted_ = false;
172 mutable int idle_thread_count_ = 0;
173
174 // Sentinel operation for interleaving reactor runs with handler execution.
175 // Ensures the reactor runs periodically even when handlers are continuously
176 // posted, preventing timer starvation.
177 struct task_op final : scheduler_op
178 {
179 void operator()() override {}
180 void destroy() override {}
181 };
182 task_op task_op_;
183 };
184
185 /*
186 select Scheduler - Single Reactor Model
187 =======================================
188
189 This scheduler mirrors the epoll_scheduler design but uses select() instead
190 of epoll for I/O multiplexing. The thread coordination strategy is identical:
191 one thread becomes the "reactor" while others wait on a condition variable.
192
193 Thread Model
194 ------------
195 - ONE thread runs select() at a time (the reactor thread)
196 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197 - When work is posted, exactly one waiting thread wakes via notify_one()
198
199 Key Differences from epoll
200 --------------------------
201 - Uses self-pipe instead of eventfd for interruption (more portable)
202 - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203 - FD_SETSIZE limit (~1024 fds on most systems)
204 - Level-triggered only (no edge-triggered mode)
205
206 Self-Pipe Pattern
207 -----------------
208 To interrupt a blocking select() call (e.g., when work is posted or a timer
209 expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210 always in the read_fds set, so select() returns immediately. We drain the
211 pipe to clear the readable state.
212
213 fd-to-op Mapping
214 ----------------
215 We use an unordered_map<int, fd_state> to track which operations are
216 registered for each fd. This allows O(1) lookup when select() returns
217 ready fds. Each fd can have at most one read op and one write op registered.
218 */
219
220 namespace select {
221
222 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223 {
224 select_scheduler const* key;
225 scheduler_context* next;
226 };
227
228 inline thread_local_ptr<scheduler_context> context_stack;
229
230 struct thread_context_guard
231 {
232 scheduler_context frame_;
233
234 148x explicit thread_context_guard(select_scheduler const* ctx) noexcept
235 148x : frame_{ctx, context_stack.get()}
236 {
237 148x context_stack.set(&frame_);
238 148x }
239
240 148x ~thread_context_guard() noexcept
241 {
242 148x context_stack.set(frame_.next);
243 148x }
244 };
245
246 struct work_guard
247 {
248 select_scheduler* self;
249 139367x ~work_guard()
250 {
251 139367x self->work_finished();
252 139367x }
253 };
254
255 } // namespace select
256
257 168x inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258 168x : pipe_fds_{-1, -1}
259 168x , outstanding_work_(0)
260 168x , stopped_(false)
261 168x , max_fd_(-1)
262 168x , reactor_running_(false)
263 168x , reactor_interrupted_(false)
264 336x , idle_thread_count_(0)
265 {
266 // Create self-pipe for interrupting select()
267 168x if (::pipe(pipe_fds_) < 0)
268 detail::throw_system_error(make_err(errno), "pipe");
269
270 // Set both ends to non-blocking and close-on-exec
271 504x for (int i = 0; i < 2; ++i)
272 {
273 336x int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
274 336x if (flags == -1)
275 {
276 int errn = errno;
277 ::close(pipe_fds_[0]);
278 ::close(pipe_fds_[1]);
279 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
280 }
281 336x if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
282 {
283 int errn = errno;
284 ::close(pipe_fds_[0]);
285 ::close(pipe_fds_[1]);
286 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
287 }
288 336x if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
289 {
290 int errn = errno;
291 ::close(pipe_fds_[0]);
292 ::close(pipe_fds_[1]);
293 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
294 }
295 }
296
297 168x timer_svc_ = &get_timer_service(ctx, *this);
298 168x timer_svc_->set_on_earliest_changed(
299 2789x timer_service::callback(this, [](void* p) {
300 2621x static_cast<select_scheduler*>(p)->interrupt_reactor();
301 2621x }));
302
303 // Initialize resolver service
304 168x get_resolver_service(ctx, *this);
305
306 // Initialize signal service
307 168x get_signal_service(ctx, *this);
308
309 // Push task sentinel to interleave reactor runs with handler execution
310 168x completed_ops_.push(&task_op_);
311 168x }
312
313 336x inline select_scheduler::~select_scheduler()
314 {
315 168x if (pipe_fds_[0] >= 0)
316 168x ::close(pipe_fds_[0]);
317 168x if (pipe_fds_[1] >= 0)
318 168x ::close(pipe_fds_[1]);
319 336x }
320
321 inline void
322 168x select_scheduler::shutdown()
323 {
324 {
325 168x std::unique_lock lock(mutex_);
326
327 343x while (auto* h = completed_ops_.pop())
328 {
329 175x if (h == &task_op_)
330 168x continue;
331 7x lock.unlock();
332 7x h->destroy();
333 7x lock.lock();
334 175x }
335 168x }
336
337 168x if (pipe_fds_[1] >= 0)
338 168x interrupt_reactor();
339
340 168x wakeup_event_.notify_all();
341 168x }
342
343 inline void
344 3012x select_scheduler::post(std::coroutine_handle<> h) const
345 {
346 struct post_handler final : scheduler_op
347 {
348 std::coroutine_handle<> h_;
349
350 3012x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
351
352 6024x ~post_handler() override = default;
353
354 3009x void operator()() override
355 {
356 3009x auto h = h_;
357 3009x delete this;
358 3009x h.resume();
359 3009x }
360
361 3x void destroy() override
362 {
363 3x auto h = h_;
364 3x delete this;
365 3x h.destroy();
366 3x }
367 };
368
369 3012x auto ph = std::make_unique<post_handler>(h);
370 3012x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
371
372 3012x std::unique_lock lock(mutex_);
373 3012x completed_ops_.push(ph.release());
374 3012x wake_one_thread_and_unlock(lock);
375 3012x }
376
377 inline void
378 131444x select_scheduler::post(scheduler_op* h) const
379 {
380 131444x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
381
382 131444x std::unique_lock lock(mutex_);
383 131444x completed_ops_.push(h);
384 131444x wake_one_thread_and_unlock(lock);
385 131444x }
386
387 inline bool
388 591x select_scheduler::running_in_this_thread() const noexcept
389 {
390 591x for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
391 361x if (c->key == this)
392 361x return true;
393 230x return false;
394 }
395
396 inline void
397 126x select_scheduler::stop()
398 {
399 126x bool expected = false;
400 126x if (stopped_.compare_exchange_strong(
401 expected, true, std::memory_order_release,
402 std::memory_order_relaxed))
403 {
404 // Wake all threads so they notice stopped_ and exit
405 {
406 126x std::lock_guard lock(mutex_);
407 126x wakeup_event_.notify_all();
408 126x }
409 126x interrupt_reactor();
410 }
411 126x }
412
413 inline bool
414 3x select_scheduler::stopped() const noexcept
415 {
416 3x return stopped_.load(std::memory_order_acquire);
417 }
418
419 inline void
420 38x select_scheduler::restart()
421 {
422 38x stopped_.store(false, std::memory_order_release);
423 38x }
424
425 inline std::size_t
426 122x select_scheduler::run()
427 {
428 122x if (stopped_.load(std::memory_order_acquire))
429 return 0;
430
431 244x if (outstanding_work_.load(std::memory_order_acquire) == 0)
432 {
433 stop();
434 return 0;
435 }
436
437 122x select::thread_context_guard ctx(this);
438
439 122x std::size_t n = 0;
440 139463x while (do_one(-1))
441 139341x if (n != (std::numeric_limits<std::size_t>::max)())
442 139341x ++n;
443 122x return n;
444 122x }
445
446 inline std::size_t
447 select_scheduler::run_one()
448 {
449 if (stopped_.load(std::memory_order_acquire))
450 return 0;
451
452 if (outstanding_work_.load(std::memory_order_acquire) == 0)
453 {
454 stop();
455 return 0;
456 }
457
458 select::thread_context_guard ctx(this);
459 return do_one(-1);
460 }
461
462 inline std::size_t
463 27x select_scheduler::wait_one(long usec)
464 {
465 27x if (stopped_.load(std::memory_order_acquire))
466 3x return 0;
467
468 48x if (outstanding_work_.load(std::memory_order_acquire) == 0)
469 {
470 stop();
471 return 0;
472 }
473
474 24x select::thread_context_guard ctx(this);
475 24x return do_one(usec);
476 24x }
477
478 inline std::size_t
479 2x select_scheduler::poll()
480 {
481 2x if (stopped_.load(std::memory_order_acquire))
482 return 0;
483
484 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
485 {
486 stop();
487 return 0;
488 }
489
490 2x select::thread_context_guard ctx(this);
491
492 2x std::size_t n = 0;
493 4x while (do_one(0))
494 2x if (n != (std::numeric_limits<std::size_t>::max)())
495 2x ++n;
496 2x return n;
497 2x }
498
499 inline std::size_t
500 select_scheduler::poll_one()
501 {
502 if (stopped_.load(std::memory_order_acquire))
503 return 0;
504
505 if (outstanding_work_.load(std::memory_order_acquire) == 0)
506 {
507 stop();
508 return 0;
509 }
510
511 select::thread_context_guard ctx(this);
512 return do_one(0);
513 }
514
515 inline void
516 5077x select_scheduler::register_fd(int fd, select_op* op, int events) const
517 {
518 // Validate fd is within select() limits
519 5077x if (fd < 0 || fd >= FD_SETSIZE)
520 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
521
522 {
523 5077x std::lock_guard lock(mutex_);
524
525 5077x auto& state = registered_fds_[fd];
526 5077x if (events & event_read)
527 2676x state.read_op = op;
528 5077x if (events & event_write)
529 2401x state.write_op = op;
530
531 5077x if (fd > max_fd_)
532 248x max_fd_ = fd;
533 5077x }
534
535 // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
536 // with the newly registered fd.
537 5077x interrupt_reactor();
538 5077x }
539
540 inline void
541 5032x select_scheduler::deregister_fd(int fd, int events) const
542 {
543 5032x std::lock_guard lock(mutex_);
544
545 5032x auto it = registered_fds_.find(fd);
546 5032x if (it == registered_fds_.end())
547 4873x return;
548
549 159x if (events & event_read)
550 159x it->second.read_op = nullptr;
551 159x if (events & event_write)
552 it->second.write_op = nullptr;
553
554 // Remove entry if both are null
555 159x if (!it->second.read_op && !it->second.write_op)
556 {
557 159x registered_fds_.erase(it);
558
559 // Recalculate max_fd_ if needed
560 159x if (fd == max_fd_)
561 {
562 158x max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
563 158x for (auto& [registered_fd, state] : registered_fds_)
564 {
565 if (registered_fd > max_fd_)
566 max_fd_ = registered_fd;
567 }
568 }
569 }
570 5032x }
571
572 inline void
573 8318x select_scheduler::work_started() noexcept
574 {
575 8318x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
576 8318x }
577
578 inline void
579 142767x select_scheduler::work_finished() noexcept
580 {
581 285534x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
582 125x stop();
583 142767x }
584
585 inline void
586 10592x select_scheduler::interrupt_reactor() const
587 {
588 10592x char byte = 1;
589 10592x [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
590 10592x }
591
592 inline void
593 134456x select_scheduler::wake_one_thread_and_unlock(
594 std::unique_lock<std::mutex>& lock) const
595 {
596 134456x if (idle_thread_count_ > 0)
597 {
598 // Idle worker exists - wake it via condvar
599 wakeup_event_.notify_one();
600 lock.unlock();
601 }
602 134456x else if (reactor_running_ && !reactor_interrupted_)
603 {
604 // No idle workers but reactor is running - interrupt it
605 2600x reactor_interrupted_ = true;
606 2600x lock.unlock();
607 2600x interrupt_reactor();
608 }
609 else
610 {
611 // No one to wake
612 131856x lock.unlock();
613 }
614 134456x }
615
616 inline long
617 7319x select_scheduler::calculate_timeout(long requested_timeout_us) const
618 {
619 7319x if (requested_timeout_us == 0)
620 return 0;
621
622 7319x auto nearest = timer_svc_->nearest_expiry();
623 7319x if (nearest == timer_service::time_point::max())
624 46x return requested_timeout_us;
625
626 7273x auto now = std::chrono::steady_clock::now();
627 7273x if (nearest <= now)
628 217x return 0;
629
630 auto timer_timeout_us =
631 7056x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
632 7056x .count();
633
634 // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
635 7056x constexpr auto long_max =
636 static_cast<long long>((std::numeric_limits<long>::max)());
637 auto capped_timer_us =
638 7056x (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
639 7056x static_cast<long long>(0)),
640 7056x long_max);
641
642 7056x if (requested_timeout_us < 0)
643 7056x return static_cast<long>(capped_timer_us);
644
645 // requested_timeout_us is already long, so min() result fits in long
646 return static_cast<long>(
647 (std::min)(static_cast<long long>(requested_timeout_us),
648 capped_timer_us));
649 }
650
651 inline void
652 75145x select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
653 {
654 // Calculate timeout considering timers, use 0 if interrupted
655 long effective_timeout_us =
656 75145x reactor_interrupted_ ? 0 : calculate_timeout(-1);
657
658 // Build fd_sets from registered_fds_
659 fd_set read_fds, write_fds, except_fds;
660 1277465x FD_ZERO(&read_fds);
661 1277465x FD_ZERO(&write_fds);
662 1277465x FD_ZERO(&except_fds);
663
664 // Always include the interrupt pipe
665 75145x FD_SET(pipe_fds_[0], &read_fds);
666 75145x int nfds = pipe_fds_[0];
667
668 // Add registered fds
669 86967x for (auto& [fd, state] : registered_fds_)
670 {
671 11822x if (state.read_op)
672 9421x FD_SET(fd, &read_fds);
673 11822x if (state.write_op)
674 {
675 2401x FD_SET(fd, &write_fds);
676 // Also monitor for errors on connect operations
677 2401x FD_SET(fd, &except_fds);
678 }
679 11822x if (fd > nfds)
680 9426x nfds = fd;
681 }
682
683 // Convert timeout to timeval
684 struct timeval tv;
685 75145x struct timeval* tv_ptr = nullptr;
686 75145x if (effective_timeout_us >= 0)
687 {
688 75099x tv.tv_sec = effective_timeout_us / 1000000;
689 75099x tv.tv_usec = effective_timeout_us % 1000000;
690 75099x tv_ptr = &tv;
691 }
692
693 75145x lock.unlock();
694
695 75145x int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
696 75145x int saved_errno = errno;
697
698 // Process timers outside the lock
699 75145x timer_svc_->process_expired();
700
701 75145x if (ready < 0 && saved_errno != EINTR)
702 detail::throw_system_error(make_err(saved_errno), "select");
703
704 // Re-acquire lock before modifying completed_ops_
705 75145x lock.lock();
706
707 // Drain the interrupt pipe if readable
708 75145x if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
709 {
710 char buf[256];
711 15366x while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
712 {
713 }
714 }
715
716 // Process I/O completions
717 75145x int completions_queued = 0;
718 75145x if (ready > 0)
719 {
720 // Iterate over registered fds (copy keys to avoid iterator invalidation)
721 7683x std::vector<int> fds_to_check;
722 7683x fds_to_check.reserve(registered_fds_.size());
723 17153x for (auto& [fd, state] : registered_fds_)
724 9470x fds_to_check.push_back(fd);
725
726 17153x for (int fd : fds_to_check)
727 {
728 9470x auto it = registered_fds_.find(fd);
729 9470x if (it == registered_fds_.end())
730 continue;
731
732 9470x auto& state = it->second;
733
734 // Check for errors (especially for connect operations)
735 9470x bool has_error = FD_ISSET(fd, &except_fds);
736
737 // Process read readiness
738 9470x if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
739 {
740 2517x auto* op = state.read_op;
741 // Claim the op by exchanging to unregistered. Both registering and
742 // registered states mean the op is ours to complete.
743 2517x auto prev = op->registered.exchange(
744 select_registration_state::unregistered,
745 std::memory_order_acq_rel);
746 2517x if (prev != select_registration_state::unregistered)
747 {
748 2517x state.read_op = nullptr;
749
750 2517x if (has_error)
751 {
752 int errn = 0;
753 socklen_t len = sizeof(errn);
754 if (::getsockopt(
755 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
756 errn = errno;
757 if (errn == 0)
758 errn = EIO;
759 op->complete(errn, 0);
760 }
761 else
762 {
763 2517x op->perform_io();
764 }
765
766 2517x completed_ops_.push(op);
767 2517x ++completions_queued;
768 }
769 }
770
771 // Process write readiness
772 9470x if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
773 {
774 2401x auto* op = state.write_op;
775 // Claim the op by exchanging to unregistered. Both registering and
776 // registered states mean the op is ours to complete.
777 2401x auto prev = op->registered.exchange(
778 select_registration_state::unregistered,
779 std::memory_order_acq_rel);
780 2401x if (prev != select_registration_state::unregistered)
781 {
782 2401x state.write_op = nullptr;
783
784 2401x if (has_error)
785 {
786 int errn = 0;
787 socklen_t len = sizeof(errn);
788 if (::getsockopt(
789 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
790 errn = errno;
791 if (errn == 0)
792 errn = EIO;
793 op->complete(errn, 0);
794 }
795 else
796 {
797 2401x op->perform_io();
798 }
799
800 2401x completed_ops_.push(op);
801 2401x ++completions_queued;
802 }
803 }
804
805 // Clean up empty entries
806 9470x if (!state.read_op && !state.write_op)
807 4918x registered_fds_.erase(it);
808 }
809 7683x }
810
811 75145x if (completions_queued > 0)
812 {
813 2522x if (completions_queued == 1)
814 126x wakeup_event_.notify_one();
815 else
816 2396x wakeup_event_.notify_all();
817 }
818 75145x }
819
820 inline std::size_t
821 139491x select_scheduler::do_one(long timeout_us)
822 {
823 139491x std::unique_lock lock(mutex_);
824
825 for (;;)
826 {
827 214636x if (stopped_.load(std::memory_order_acquire))
828 122x return 0;
829
830 214514x scheduler_op* op = completed_ops_.pop();
831
832 214514x if (op == &task_op_)
833 {
834 75147x bool more_handlers = !completed_ops_.empty();
835
836 75147x if (!more_handlers)
837 {
838 14642x if (outstanding_work_.load(std::memory_order_acquire) == 0)
839 {
840 completed_ops_.push(&task_op_);
841 return 0;
842 }
843 7321x if (timeout_us == 0)
844 {
845 2x completed_ops_.push(&task_op_);
846 2x return 0;
847 }
848 }
849
850 75145x reactor_interrupted_ = more_handlers || timeout_us == 0;
851 75145x reactor_running_ = true;
852
853 75145x if (more_handlers && idle_thread_count_ > 0)
854 wakeup_event_.notify_one();
855
856 75145x run_reactor(lock);
857
858 75145x reactor_running_ = false;
859 75145x completed_ops_.push(&task_op_);
860 75145x continue;
861 75145x }
862
863 139367x if (op != nullptr)
864 {
865 139367x lock.unlock();
866 139367x select::work_guard g{this};
867 139367x (*op)();
868 139367x return 1;
869 139367x }
870
871 if (outstanding_work_.load(std::memory_order_acquire) == 0)
872 return 0;
873
874 if (timeout_us == 0)
875 return 0;
876
877 ++idle_thread_count_;
878 if (timeout_us < 0)
879 wakeup_event_.wait(lock);
880 else
881 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
882 --idle_thread_count_;
883 75145x }
884 139491x }
885
886 } // namespace boost::corosio::detail
887
888 #endif // BOOST_COROSIO_HAS_SELECT
889
890 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
891