Skip to content

Commit d23a669

Browse files
committed
Support nonblocking socket read before select for MRI 1.9.3+ only
1 parent b6cda35 commit d23a669

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

lib/mqtt/client.rb

+21-3
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ def initialize(*args)
167167
@read_thread = nil
168168
@write_semaphore = Mutex.new
169169
@pubacks_semaphore = Mutex.new
170+
171+
@wait_for_read_func = method(
172+
if OpenSSL::SSL::SSLSocket.method_defined?(:read_nonblock)
173+
:nonblocking_read_before_select
174+
else
175+
:select_without_nonblocking_read
176+
end
177+
)
170178
end
171179

172180
# Get the OpenSSL context, that is used if SSL/TLS is enabled
@@ -438,9 +446,12 @@ def unsubscribe(*topics)
438446

439447
private
440448

441-
# Try to read a packet from the server
442-
# Also sends keep-alive ping packets.
443-
def receive_packet
449+
def select_without_nonblocking_read
450+
# Poll socket - is there data waiting?
451+
[nil, !IO.select([@socket], [], [], SELECT_TIMEOUT).nil?]
452+
end
453+
454+
def nonblocking_read_before_select
444455
first_byte_in_packet = nil
445456
data_available_to_read = false
446457
begin
@@ -456,6 +467,13 @@ def receive_packet
456467
[@socket], [], [], SELECT_TIMEOUT
457468
).nil?
458469
end
470+
[first_byte_in_packet, data_available_to_read]
471+
end
472+
473+
# Try to read a packet from the server
474+
# Also sends keep-alive ping packets.
475+
def receive_packet
476+
first_byte_in_packet, data_available_to_read = @wait_for_read_func.call
459477
if data_available_to_read
460478
# Yes - read in the packet
461479
packet = MQTT::Packet.read(@socket, first_byte_in_packet)

spec/mqtt_client_spec.rb

+3-5
Original file line numberDiff line numberDiff line change
@@ -845,11 +845,9 @@
845845
expect(@read_queue.size).to eq(1)
846846
end
847847

848-
it "should put PUBLISH messages on to the read queue following a wait readable exception" do
849-
wait_readable_exception = Class.new(StandardError) do
850-
include IO::WaitReadable
851-
end
852-
allow(socket).to receive(:read_nonblock).and_raise(wait_readable_exception)
848+
it "should put PUBLISH messages on to the read queue following an IO::WaitReadable exception",
849+
:if => OpenSSL::SSL::SSLSocket.respond_to?(:read_nonblock) do
850+
allow(socket).to receive(:read_nonblock).and_raise(IO::WaitReadable)
853851
socket.write("\x30\x0e\x00\x05topicpayload")
854852
socket.rewind
855853
client.send(:receive_packet)

0 commit comments

Comments
 (0)