Skip to content

Commit

Permalink
Fix multishot accept
Browse files Browse the repository at this point in the history
  • Loading branch information
noteflakes committed Jul 16, 2023
1 parent dc05a79 commit 2c1c007
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 60 deletions.
87 changes: 37 additions & 50 deletions ext/polyphony/backend_io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ static inline bool cq_ring_needs_flush(struct io_uring *ring) {
return IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_CQ_OVERFLOW;
}

#define MULTISHOT_ACCEPT_QUEUE(socket) (rb_ivar_get(socket, ID_ivar_multishot_accept_queue))

static void handle_multishot_accept_completion(op_context_t *ctx, struct io_uring_cqe *cqe, Backend_t *backend) {
// printf("handle_multishot_accept_completion result: %d\n", ctx->result);
if (ctx->result == -ECANCELED) {
Expand All @@ -162,9 +164,9 @@ static void handle_multishot_accept_completion(op_context_t *ctx, struct io_urin
if (!(cqe->flags & IORING_CQE_F_MORE)) {
context_store_release(&backend->store, ctx);
}
VALUE queue = rb_ivar_get(ctx->resume_value, ID_ivar_multishot_accept_queue);
VALUE queue = MULTISHOT_ACCEPT_QUEUE(ctx->resume_value);
if (queue != Qnil)
Queue_push(queue, INT2NUM(ctx->result));
Queue_push(queue, INT2FIX(ctx->result));
}
}

Expand Down Expand Up @@ -972,6 +974,19 @@ VALUE Backend_sendmsg(VALUE self, VALUE io, VALUE buffer, VALUE flags, VALUE des
return INT2FIX(buffer_spec.len);
}

inline VALUE create_socket_from_fd(int fd, VALUE socket_class) {
rb_io_t *fp;

VALUE socket = rb_obj_alloc(socket_class);
MakeOpenFile(socket, fp);
rb_update_max_fd(fd);
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
rb_io_synchronized(fp);
return socket;
}

VALUE io_uring_backend_accept(Backend_t *backend, VALUE server_socket, VALUE socket_class, int loop) {
int server_fd;
rb_io_t *server_fptr;
Expand Down Expand Up @@ -999,19 +1014,7 @@ VALUE io_uring_backend_accept(Backend_t *backend, VALUE server_socket, VALUE soc
if (fd < 0)
rb_syserr_fail(-fd, strerror(-fd));
else {
rb_io_t *fp;

socket = rb_obj_alloc(socket_class);
MakeOpenFile(socket, fp);
rb_update_max_fd(fd);
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
rb_io_synchronized(fp);

// if (rsock_do_not_reverse_lookup) {
// fp->mode |= FMODE_NOREVLOOKUP;
// }
socket = create_socket_from_fd(fd, socket_class);
if (loop) {
rb_yield(socket);
socket = Qnil;
Expand All @@ -1026,24 +1029,14 @@ VALUE io_uring_backend_accept(Backend_t *backend, VALUE server_socket, VALUE soc

VALUE Backend_accept(VALUE self, VALUE server_socket, VALUE socket_class) {
#ifdef HAVE_IO_URING_PREP_MULTISHOT_ACCEPT
VALUE accept_queue = rb_ivar_get(server_socket, ID_ivar_multishot_accept_queue);
VALUE accept_queue = MULTISHOT_ACCEPT_QUEUE(server_socket);
if (accept_queue != Qnil) {
VALUE next = Queue_shift(0, 0, accept_queue);
int fd = NUM2INT(next);
if (fd < 0)
rb_syserr_fail(-fd, strerror(-fd));
else {
rb_io_t *fp;

VALUE socket = rb_obj_alloc(socket_class);
MakeOpenFile(socket, fp);
rb_update_max_fd(fd);
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
rb_io_synchronized(fp);
return socket;
}
else
return create_socket_from_fd(fd, socket_class);
}
#endif

Expand Down Expand Up @@ -1105,32 +1098,26 @@ VALUE Backend_multishot_accept(VALUE self, VALUE server_socket) {
);
}

static inline VALUE accept_loop_from_queue(VALUE server_socket, VALUE socket_class) {
VALUE accept_queue = MULTISHOT_ACCEPT_QUEUE(server_socket);
if (accept_queue == Qnil) return Qnil;

while (true) {
VALUE next = Queue_shift(0, 0, accept_queue);
int fd = NUM2INT(next);
if (fd < 0)
rb_syserr_fail(-fd, strerror(-fd));
else
rb_yield(create_socket_from_fd(fd, socket_class));
}
return Qtrue;
}
#endif

VALUE Backend_accept_loop(VALUE self, VALUE server_socket, VALUE socket_class) {
#ifdef HAVE_IO_URING_PREP_MULTISHOT_ACCEPT
VALUE accept_queue = rb_ivar_get(server_socket, ID_ivar_multishot_accept_queue);
if (accept_queue != Qnil) {
while (true) {
VALUE next = Queue_shift(0, 0, accept_queue);
int fd = NUM2INT(next);
if (fd < 0)
rb_syserr_fail(-fd, strerror(-fd));
else {
rb_io_t *fp;

VALUE socket = rb_obj_alloc(socket_class);
MakeOpenFile(socket, fp);
rb_update_max_fd(fd);
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
rb_io_synchronized(fp);
rb_yield(socket);
}
}
return self;
}
VALUE result = accept_loop_from_queue(server_socket, socket_class);
if (RTEST(result)) return self;
#endif

Backend_t *backend;
Expand Down
5 changes: 3 additions & 2 deletions ext/polyphony/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def get_config
version, major_revision, distribution = m[1].to_i, m[2].to_i, m[4]

combined_version = version.to_i * 100 + major_revision.to_i


config[:kernel_version] = combined_version
config[:pidfd_open] = combined_version > 503
config[:multishot_recv] = combined_version >= 600
config[:multishot_recvmsg] = combined_version >= 600
Expand All @@ -31,7 +32,7 @@ def get_config
end

config = get_config
puts "Building Polyphony... (#{config.inspect})"
puts "Building Polyphony (\n#{config.map { |(k, v)| " #{k}: #{v}\n"}.join})"

require_relative 'zlib_conf'

Expand Down
2 changes: 1 addition & 1 deletion lib/polyphony/extensions/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def accept
Polyphony.backend_accept(@io, TCPSocket)
end

if Polyphony.instance_methods(false).include?(:backend_multishot_accept)
if Polyphony.methods(false).include?(:backend_multishot_accept)
# Starts a multishot accept operation (only available with io_uring
# backend). Example usage:
#
Expand Down
14 changes: 7 additions & 7 deletions test/test_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -468,20 +468,20 @@ def test_multishot_accept_loop
server&.close
end

def test_multishot_accept_error
def test_multishot_accept_interrupt
port, server = start_tcp_server_on_random_port
error = nil
server_fiber = spin do
server.multishot_accept do
server.accept_loop { |s| spin_client(s) }
rescue SystemCallError => e
error = e
end
end
snooze
10.times { snooze }
server_socket = server.instance_variable_get(:@io)
assert server_socket.instance_variable_get(:@multishot_accept_queue)
server.close
snooze
server_fiber.stop
10.times { snooze }
server_fiber.await
assert_kind_of Errno::EBADF, error
assert_nil server_socket.instance_variable_get(:@multishot_accept_queue)
end
end

0 comments on commit 2c1c007

Please sign in to comment.