Skip to content

Commit

Permalink
Allow interruption of IO waits.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Mar 2, 2025
1 parent 0aa989c commit a887d85
Show file tree
Hide file tree
Showing 12 changed files with 588 additions and 29 deletions.
1 change: 1 addition & 0 deletions ext/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

have_func("rb_ext_ractor_safe")
have_func("&rb_fiber_transfer")
have_func("rb_io_interruptible_operation")

if have_library("uring") and have_header("liburing.h")
# We might want to consider using this in the future:
Expand Down
6 changes: 6 additions & 0 deletions ext/io/event/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ void Init_IO_Event(void)
#ifdef IO_EVENT_SELECTOR_KQUEUE
Init_IO_Event_Selector_KQueue(IO_Event_Selector);
#endif

#ifdef HAVE_RB_IO_INTERRUPTABLE_OPERATION
rb_define_const(IO_Event, "INTERRUPTABLE", Qtrue);
#else
rb_define_const(IO_Event, "INTERRUPTABLE", Qfalse);
#endif
}
7 changes: 5 additions & 2 deletions ext/io/event/selector/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ VALUE IO_Event_Selector_EPoll_push(VALUE self, VALUE fiber)

IO_Event_Selector_ready_push(&selector->backend, fiber);

return Qnil;
return fiber;
}

VALUE IO_Event_Selector_EPoll_raise(int argc, VALUE *argv, VALUE self)
Expand Down Expand Up @@ -523,6 +523,8 @@ VALUE IO_Event_Selector_EPoll_process_wait(VALUE self, VALUE fiber, VALUE _pid,
struct io_wait_arguments {
struct IO_Event_Selector_EPoll *selector;
struct IO_Event_Selector_EPoll_Waiting *waiting;

VALUE io;
};

static
Expand All @@ -538,7 +540,7 @@ static
VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;

IO_Event_Selector_loop_yield(&arguments->selector->backend);
IO_Event_Selector_loop_yield_io(&arguments->selector->backend, arguments->io);

if (arguments->waiting->ready) {
return RB_INT2NUM(arguments->waiting->ready);
Expand Down Expand Up @@ -578,6 +580,7 @@ VALUE IO_Event_Selector_EPoll_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
struct io_wait_arguments io_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
};

return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
Expand Down
10 changes: 7 additions & 3 deletions ext/io/event/selector/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ enum {
DEBUG = 0,
DEBUG_IO_READ = 0,
DEBUG_IO_WRITE = 0,
DEBUG_IO_WAIT = 0
DEBUG_IO_WAIT = 0,
DEBUG_IO_INTERRUPT = 1
};

#ifndef EVFILT_USER
Expand Down Expand Up @@ -407,7 +408,7 @@ VALUE IO_Event_Selector_KQueue_push(VALUE self, VALUE fiber)

IO_Event_Selector_ready_push(&selector->backend, fiber);

return Qnil;
return fiber;
}

VALUE IO_Event_Selector_KQueue_raise(int argc, VALUE *argv, VALUE self)
Expand Down Expand Up @@ -516,6 +517,8 @@ VALUE IO_Event_Selector_KQueue_process_wait(VALUE self, VALUE fiber, VALUE _pid,
struct io_wait_arguments {
struct IO_Event_Selector_KQueue *selector;
struct IO_Event_Selector_KQueue_Waiting *waiting;

VALUE io;
};

static
Expand All @@ -531,7 +534,7 @@ static
VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;

IO_Event_Selector_loop_yield(&arguments->selector->backend);
IO_Event_Selector_loop_yield_io(&arguments->selector->backend, arguments->io);

if (arguments->waiting->ready) {
return RB_INT2NUM(arguments->waiting->ready);
Expand Down Expand Up @@ -564,6 +567,7 @@ VALUE IO_Event_Selector_KQueue_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE
struct io_wait_arguments io_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
};

if (DEBUG_IO_WAIT) fprintf(stderr, "IO_Event_Selector_KQueue_io_wait descriptor=%d\n", descriptor);
Expand Down
15 changes: 15 additions & 0 deletions ext/io/event/selector/selector.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ VALUE IO_Event_Selector_loop_yield(struct IO_Event_Selector *backend)
return IO_Event_Fiber_transfer(backend->loop, 0, NULL);
}

#ifndef HAVE_RB_IO_INTERRUPTABLE_OPERATION
static inline VALUE
rb_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument) {
return function(argument);
}
#endif

static VALUE IO_Event_Selector_loop_yield_io_interruptible(VALUE fiber) {
return IO_Event_Fiber_transfer(fiber, 0, NULL);
}

VALUE IO_Event_Selector_loop_yield_io(struct IO_Event_Selector *backend, VALUE io) {
return rb_io_interruptible_operation(io, IO_Event_Selector_loop_yield_io_interruptible, backend->loop);
}

struct wait_and_transfer_arguments {
int argc;
VALUE *argv;
Expand Down
3 changes: 3 additions & 0 deletions ext/io/event/selector/selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ VALUE IO_Event_Selector_loop_resume(struct IO_Event_Selector *backend, VALUE fib
// Strictly speaking, it's not a scheduling operation (does not schedule the current fiber).
VALUE IO_Event_Selector_loop_yield(struct IO_Event_Selector *backend);

// Similar to `IO_Event_Selector_loop_yield` but allows the caller to specify an IO which may be interrupted.
VALUE IO_Event_Selector_loop_yield_io(struct IO_Event_Selector *backend, VALUE io);

// Resume a specific fiber. This is a scheduling operation.
// The first argument is the fiber, the rest are the arguments to the resume.
//
Expand Down
28 changes: 17 additions & 11 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ VALUE IO_Event_Selector_URing_push(VALUE self, VALUE fiber)

IO_Event_Selector_ready_push(&selector->backend, fiber);

return Qnil;
return fiber;
}

VALUE IO_Event_Selector_URing_raise(int argc, VALUE *argv, VALUE self)
Expand Down Expand Up @@ -522,6 +522,7 @@ int events_from_poll_flags(short flags) {
struct io_wait_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
VALUE io;
short flags;
};

Expand All @@ -548,7 +549,7 @@ VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
struct IO_Event_Selector_URing *selector = arguments->selector;

IO_Event_Selector_loop_yield(&selector->backend);
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);

if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);

Expand Down Expand Up @@ -588,7 +589,8 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
struct io_wait_arguments io_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.flags = flags
.io = io,
.flags = flags,
};

return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
Expand Down Expand Up @@ -619,6 +621,7 @@ static inline off_t io_seekable(int descriptor)
struct io_read_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
VALUE io;
int descriptor;
off_t offset;
char *buffer;
Expand All @@ -638,7 +641,7 @@ io_read_submit(VALUE _arguments)
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
io_uring_submit_now(selector);

IO_Event_Selector_loop_yield(&selector->backend);
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);

return RB_INT2NUM(arguments->waiting->result);
}
Expand All @@ -664,7 +667,7 @@ io_read_ensure(VALUE _arguments)
}

static int
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, VALUE io, int descriptor, char *buffer, size_t length, off_t offset)
{
struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -677,6 +680,7 @@ io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, c
struct io_read_arguments io_read_arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
.descriptor = descriptor,
.offset = offset,
.buffer = buffer,
Expand Down Expand Up @@ -705,7 +709,7 @@ VALUE IO_Event_Selector_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE b

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_read(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down Expand Up @@ -756,7 +760,7 @@ VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_read(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand All @@ -783,6 +787,7 @@ VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE
struct io_write_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
VALUE io;
int descriptor;
off_t offset;
char *buffer;
Expand All @@ -802,7 +807,7 @@ io_write_submit(VALUE _argument)
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
io_uring_submit_pending(selector);

IO_Event_Selector_loop_yield(&selector->backend);
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);

return RB_INT2NUM(arguments->waiting->result);
}
Expand All @@ -828,7 +833,7 @@ io_write_ensure(VALUE _argument)
}

static int
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, VALUE io, int descriptor, char *buffer, size_t length, off_t offset)
{
struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -841,6 +846,7 @@ io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor,
struct io_write_arguments arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
.descriptor = descriptor,
.offset = offset,
.buffer = buffer,
Expand Down Expand Up @@ -873,7 +879,7 @@ VALUE IO_Event_Selector_URing_io_write(VALUE self, VALUE fiber, VALUE io, VALUE

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_write(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down Expand Up @@ -928,7 +934,7 @@ VALUE IO_Event_Selector_URing_io_pwrite(VALUE self, VALUE fiber, VALUE io, VALUE

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_write(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down
Loading

0 comments on commit a887d85

Please sign in to comment.