diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index e0833fb..daef851 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -167,6 +167,14 @@ def initialize(*args) @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new + + @wait_for_read_func = method( + if OpenSSL::SSL::SSLSocket.method_defined?(:read_nonblock) + :nonblocking_read_before_select + else + :select_without_nonblocking_read + end + ) end # Get the OpenSSL context, that is used if SSL/TLS is enabled @@ -438,14 +446,37 @@ def unsubscribe(*topics) private + def select_without_nonblocking_read + # Poll socket - is there data waiting? + [nil, !IO.select([@socket], [], [], SELECT_TIMEOUT).nil?] + end + + def nonblocking_read_before_select + first_byte_in_packet = nil + data_available_to_read = false + begin + # Poll socket - is there data waiting? + result = @socket.read_nonblock(1) + if result && result.length == 1 + first_byte_in_packet = result.unpack('C').first + data_available_to_read = true + end + rescue IO::WaitReadable + # Wait for data to be available + data_available_to_read = !IO.select( + [@socket], [], [], SELECT_TIMEOUT + ).nil? + end + [first_byte_in_packet, data_available_to_read] + end + # Try to read a packet from the server # Also sends keep-alive ping packets. def receive_packet - # Poll socket - is there data waiting? - result = IO.select([@socket], [], [], SELECT_TIMEOUT) - unless result.nil? + first_byte_in_packet, data_available_to_read = @wait_for_read_func.call + if data_available_to_read # Yes - read in the packet - packet = MQTT::Packet.read(@socket) + packet = MQTT::Packet.read(@socket, first_byte_in_packet) handle_packet packet end keep_alive! diff --git a/lib/mqtt/packet.rb b/lib/mqtt/packet.rb index bfd4354..efb8538 100644 --- a/lib/mqtt/packet.rb +++ b/lib/mqtt/packet.rb @@ -24,10 +24,10 @@ class Packet } # Read in a packet from a socket - def self.read(socket) + def self.read(socket, first_byte_in_packet = nil) # Read in the packet header and create a new packet object packet = create_from_header( - read_byte(socket) + first_byte_in_packet || read_byte(socket) ) packet.validate_flags diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index 2f8e929..240cda8 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -838,7 +838,16 @@ allow(@parent_thread).to receive(:raise) end - it "should put PUBLISH messages on to the read queue" do + it "should put PUBLISH messages on to the read queue when data can be immediately read" do + socket.write("\x30\x0e\x00\x05topicpayload") + socket.rewind + client.send(:receive_packet) + expect(@read_queue.size).to eq(1) + end + + it "should put PUBLISH messages on to the read queue following an IO::WaitReadable exception", + :if => OpenSSL::SSL::SSLSocket.respond_to?(:read_nonblock) do + allow(socket).to receive(:read_nonblock).and_raise(IO::WaitReadable) socket.write("\x30\x0e\x00\x05topicpayload") socket.rewind client.send(:receive_packet) @@ -853,18 +862,24 @@ end it "should close the socket if there is an exception" do + socket.write("\x20") + socket.rewind expect(socket).to receive(:close).once allow(MQTT::Packet).to receive(:read).and_raise(MQTT::Exception) client.send(:receive_packet) end it "should pass exceptions up to parent thread" do + socket.write("\x20") + socket.rewind expect(@parent_thread).to receive(:raise).once allow(MQTT::Packet).to receive(:read).and_raise(MQTT::Exception) client.send(:receive_packet) end it "should update last_ping_response when receiving a Pingresp" do + socket.write("\x20") + socket.rewind allow(MQTT::Packet).to receive(:read).and_return MQTT::Packet::Pingresp.new client.instance_variable_set '@last_ping_response', Time.at(0) client.send :receive_packet diff --git a/spec/mqtt_packet_spec.rb b/spec/mqtt_packet_spec.rb index f787eae..40bce39 100644 --- a/spec/mqtt_packet_spec.rb +++ b/spec/mqtt_packet_spec.rb @@ -434,6 +434,20 @@ end end + describe "reading a packet from a socket with a separate first byte parameter" do + let(:socket) { StringIO.new("\x11\x00\x04testhello world") } + let(:packet) { MQTT::Packet.read(socket, 48) } + + it "should correctly create the right type of packet object" do + expect(packet.class).to eq(MQTT::Packet::Publish) + end + + it "should set the payload correctly" do + expect(packet.payload).to eq('hello world') + expect(packet.payload.encoding.to_s).to eq('ASCII-8BIT') + end + end + describe "when calling the inspect method" do it "should output the payload, if it is less than 16 bytes" do packet = MQTT::Packet::Publish.new( :topic => "topic", :payload => "payload" )