Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do nonblocking socket read before select for packet receive #104

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions lib/mqtt/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ def initialize(*args)
@read_thread = nil
@write_semaphore = Mutex.new
@pubacks_semaphore = Mutex.new

@wait_for_read_func = method(
if OpenSSL::SSL::SSLSocket.method_defined?(:read_nonblock)
:nonblocking_read_before_select
else
:select_without_nonblocking_read
end
)
end

# Get the OpenSSL context, that is used if SSL/TLS is enabled
Expand Down Expand Up @@ -438,14 +446,37 @@ def unsubscribe(*topics)

private

def select_without_nonblocking_read
# Poll socket - is there data waiting?
[nil, !IO.select([@socket], [], [], SELECT_TIMEOUT).nil?]
end

def nonblocking_read_before_select
first_byte_in_packet = nil
data_available_to_read = false
begin
# Poll socket - is there data waiting?
result = @socket.read_nonblock(1)
if result && result.length == 1
first_byte_in_packet = result.unpack('C').first
data_available_to_read = true
end
rescue IO::WaitReadable
# Wait for data to be available
data_available_to_read = !IO.select(
[@socket], [], [], SELECT_TIMEOUT
).nil?
end
[first_byte_in_packet, data_available_to_read]
end

# Try to read a packet from the server
# Also sends keep-alive ping packets.
def receive_packet
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
unless result.nil?
first_byte_in_packet, data_available_to_read = @wait_for_read_func.call
if data_available_to_read
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
packet = MQTT::Packet.read(@socket, first_byte_in_packet)
handle_packet packet
end
keep_alive!
Expand Down
4 changes: 2 additions & 2 deletions lib/mqtt/packet.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ class Packet
}

# Read in a packet from a socket
def self.read(socket)
def self.read(socket, first_byte_in_packet = nil)
# Read in the packet header and create a new packet object
packet = create_from_header(
read_byte(socket)
first_byte_in_packet || read_byte(socket)
)
packet.validate_flags

Expand Down
17 changes: 16 additions & 1 deletion spec/mqtt_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,16 @@
allow(@parent_thread).to receive(:raise)
end

it "should put PUBLISH messages on to the read queue" do
it "should put PUBLISH messages on to the read queue when data can be immediately read" do
socket.write("\x30\x0e\x00\x05topicpayload")
socket.rewind
client.send(:receive_packet)
expect(@read_queue.size).to eq(1)
end

it "should put PUBLISH messages on to the read queue following an IO::WaitReadable exception",
:if => OpenSSL::SSL::SSLSocket.respond_to?(:read_nonblock) do
allow(socket).to receive(:read_nonblock).and_raise(IO::WaitReadable)
socket.write("\x30\x0e\x00\x05topicpayload")
socket.rewind
client.send(:receive_packet)
Expand All @@ -853,18 +862,24 @@
end

it "should close the socket if there is an exception" do
socket.write("\x20")
socket.rewind
expect(socket).to receive(:close).once
allow(MQTT::Packet).to receive(:read).and_raise(MQTT::Exception)
client.send(:receive_packet)
end

it "should pass exceptions up to parent thread" do
socket.write("\x20")
socket.rewind
expect(@parent_thread).to receive(:raise).once
allow(MQTT::Packet).to receive(:read).and_raise(MQTT::Exception)
client.send(:receive_packet)
end

it "should update last_ping_response when receiving a Pingresp" do
socket.write("\x20")
socket.rewind
allow(MQTT::Packet).to receive(:read).and_return MQTT::Packet::Pingresp.new
client.instance_variable_set '@last_ping_response', Time.at(0)
client.send :receive_packet
Expand Down
14 changes: 14 additions & 0 deletions spec/mqtt_packet_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,20 @@
end
end

describe "reading a packet from a socket with a separate first byte parameter" do
let(:socket) { StringIO.new("\x11\x00\x04testhello world") }
let(:packet) { MQTT::Packet.read(socket, 48) }

it "should correctly create the right type of packet object" do
expect(packet.class).to eq(MQTT::Packet::Publish)
end

it "should set the payload correctly" do
expect(packet.payload).to eq('hello world')
expect(packet.payload.encoding.to_s).to eq('ASCII-8BIT')
end
end

describe "when calling the inspect method" do
it "should output the payload, if it is less than 16 bytes" do
packet = MQTT::Packet::Publish.new( :topic => "topic", :payload => "payload" )
Expand Down