Skip to content

Commit f9c631d

Browse files
committed
fixes and formatting
1 parent e677738 commit f9c631d

18 files changed

+643
-483
lines changed

Project.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1515
PrecompileMQTT = "PrecompileTools"
1616

1717
[compat]
18-
julia = "1.7"
18+
julia = "1.8"
1919

2020
[extras]
2121
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"

README.md

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
[![Dev](https://img.shields.io/badge/docs-dev-blue.svg)](https://JuliaMessaging.github.io/MQTTClient.jl/dev/)
55
[![Build Status](https://github.com/JuliaMessaging/MQTTClient.jl/actions/workflows/CI.yml/badge.svg?branch=main)](https://github.com/JuliaMessaging/MQTTClient.jl/actions/workflows/CI.yml?query=branch%3Amain)
66
[![Coverage](https://codecov.io/gh/JuliaMessaging/MQTTClient.jl/branch/main/graph/badge.svg)](https://codecov.io/gh/JuliaMessaging/MQTTClient.jl)
7-
[![Coverage](https://coveralls.io/repos/github/JuliaMessaging/MQTTClient.jl/badge.svg?branch=main)](https://coveralls.io/github/JuliaMessaging/MQTTClient.jl?branch=main)
87
[![Code Style: Blue](https://img.shields.io/badge/code%20style-blue-4495d1.svg)](https://github.com/invenia/BlueStyle)
98

109
MQTT Client Library for Julia

docs/make.jl

+4-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
push!(LOAD_PATH,"../src/")
1+
push!(LOAD_PATH, "../src/")
22

33
using MQTTClient
44
using Documenter
@@ -21,15 +21,10 @@ makedocs(;
2121
"Getting Started" => "getting-started.md",
2222
"MQTT Interface Functions" => "interfaces.md",
2323
"MQTT Client" => "client.md",
24-
"MQTT API" => [
25-
"Client" => "api/client.md",
26-
"Interfacing Functions" => "api/interface.md",
27-
],
24+
"MQTT API" =>
25+
["Client" => "api/client.md", "Interfacing Functions" => "api/interface.md"],
2826
"Utils" => "utils.md",
2927
],
3028
)
3129

32-
deploydocs(;
33-
repo="github.com/JuliaMessaging/MQTTClient.jl",
34-
devbranch="main",
35-
)
30+
deploydocs(; repo="github.com/JuliaMessaging/MQTTClient.jl", devbranch="main")

examples/basic.jl

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@ payload = "Hello World!"
66

77
# Define the callback for receiving messages.
88
function on_msg(topic, payload)
9-
println("Received message topic: [", topic, "] payload: [", String(payload), "]")
9+
return println("Received message topic: [", topic, "] payload: [", String(payload), "]")
1010
end
1111

1212
# Instantiate a client.
13-
client, connection = MakeConnection(broker,1883)
13+
client, connection = MakeConnection(broker, 1883)
1414

1515
connect(client, connection)
1616
println("connected to $client at $(connection.protocol)")
1717

1818
# Subscribe to the topic.
19-
subscribe(client, topic, on_msg, qos=QOS_2)
19+
subscribe(client, topic, on_msg; qos=QOS_2)
2020
println("subscribed to $topic")
2121

2222
sleep(0.5)
2323

24-
publish(client, topic, payload, qos=QOS_2)
24+
publish(client, topic, payload; qos=QOS_2)
2525
println("published $payload to $topic")
2626

2727
# Unsubscribe from the topic

ext/PrecompileMQTT.jl

+10-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ using MQTTClient
1414
# Putting some things in `@setup_workload` instead of `@compile_workload` can reduce the size of the
1515
# precompile file and potentially make loading faster.
1616

17-
cb(t,p) = nothing
17+
cb(t, p) = nothing
1818
topic = "foo"
1919
payload = "bar"
2020

@@ -36,7 +36,11 @@ using MQTTClient
3636

3737
message = MQTTClient.Message(false, UInt8(MQTTClient.QOS_0), false, topic, payload)
3838
optional = message.qos == 0x00 ? () : (0)
39-
cmd = MQTTClient.PUBLISH | ((message.dup & 0x1) << 3) | (message.qos << 1) | message.retain
39+
cmd =
40+
MQTTClient.PUBLISH |
41+
((message.dup & 0x1) << 3) |
42+
(message.qos << 1) |
43+
message.retain
4044
packet = MQTTClient.Packet(cmd, [message.topic, optional..., message.payload])
4145
buffer = PipeBuffer()
4246
for i in packet.data
@@ -70,7 +74,6 @@ using MQTTClient
7074
MQTTClient.handle_pubrel(c, s, cmd, flags)
7175
p = take!(c.write_packets)
7276

73-
7477
# handle_suback
7578
c = MQTTClient.Client()
7679
s = IOBuffer()
@@ -96,23 +99,22 @@ using MQTTClient
9699
## Interfaces:
97100
# subscribe
98101
c = MQTTClient.Client()
99-
future = MQTTClient.subscribe_async(c, topic, cb, qos=MQTTClient.QOS_2)
102+
future = MQTTClient.subscribe_async(c, topic, cb; qos=MQTTClient.QOS_2)
100103

101104
# unsubscribe
102-
c = MQTTClient.Client()
105+
c = MQTTClient.Client()
103106
insert!(c.on_msg, topic, cb)
104107
@atomicswap c.last_id = 0x0
105108
future = unsubscribe_async(c, topic)
106109

107-
108110
## TCP Basic Run
109111
server = MQTTClient.MockMQTTBroker(ip"127.0.0.1", 1889)
110112
client, conn = MakeConnection(ip"127.0.0.1", 1889)
111113

112114
connect(client, conn)
113115

114116
subscribe(client, "foo/bar", cb)
115-
publish(client, "bar/foo", qos=QOS_2)
117+
publish(client, "bar/foo", "baz"; qos=QOS_2)
116118
unsubscribe(client, "foo/bar")
117119

118120
disconnect(client)
@@ -125,7 +127,7 @@ using MQTTClient
125127
connect(client, conn)
126128

127129
subscribe(client, "foo/bar", cb)
128-
publish(client, "bar/foo")
130+
publish(client, "bar/foo", "baz")
129131
unsubscribe(client, "foo/bar")
130132

131133
disconnect(client)

src/MQTTClient.jl

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import Base: ReentrantLock, lock, unlock, convert, PipeEndpoint, fetch, show
88
import Base: @atomic, @atomicreplace, @atomicswap, Ref, RefValue, isready
99
using Base.Threads
1010

11-
1211
include("utils.jl")
1312
include("internals.jl")
1413
include("topic.jl")
@@ -17,8 +16,7 @@ include("connection.jl")
1716
include("handlers.jl")
1817
include("interface.jl")
1918

20-
export
21-
MakeConnection,
19+
export MakeConnection,
2220
Configuration,
2321
Client,
2422
Connection,

src/client.jl

+49-46
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,30 @@
11
"""
22
Client
33
4-
The MQTT client in Julia facilitates communication between a device and an MQTT broker over a network.
5-
It manages connections, message handling, and maintains the state of communication.
6-
The client operates through three main loops: the read loop listens for incoming messages from the broker and processes them using designated handlers;
7-
the write loop sends packets to the broker from a queue, ensuring thread safety with a socket lock;
8-
and the keep-alive loop periodically sends ping requests to the broker to maintain the connection and detect disconnections.
4+
The MQTT client in Julia facilitates communication between a device and an MQTT broker over a network.
5+
It manages connections, message handling, and maintains the state of communication.
6+
The client operates through three main loops: the read loop listens for incoming messages from the broker and processes them using designated handlers;
7+
the write loop sends packets to the broker from a queue, ensuring thread safety with a socket lock;
8+
and the keep-alive loop periodically sends ping requests to the broker to maintain the connection and detect disconnections.
99
This client uses atomic operations to ensure thread safety for shared variables and supports asynchronous task management for efficient, non-blocking operations.
1010
1111
# Fields
12-
- `state::UInt8`: client state.
13-
- `on_msg::TrieNode`: A trie mapping topics to callback functions.
14-
- `keep_alive::UInt16`: The keep-alive time in seconds.
15-
- `last_id::UInt16`: The last packet identifier used.
16-
- `in_flight::Dict{UInt16, Future}`: A dictionary mapping packet identifiers to futures.
17-
- `write_packets::AbstractChannel`: A channel for writing packets.
18-
- `socket`: The socket used for communication with the broker.
19-
- `socket_lock`: A lock for synchronizing access to the socket.
20-
- `ping_timeout::UInt64`: The ping timeout in seconds.
21-
- `ping_outstanding::Atomic{UInt8}`: An atomic counter for the number of outstanding ping requests.
22-
- `last_sent::Atomic{Float64}`: An atomic float representing the timestamp of the last sent packet.
23-
- `last_received::Atomic{Float64}`: An atomic float representing the timestamp of the last received packet.
12+
13+
- `state::UInt8`: client state.
14+
- `on_msg::TrieNode`: A trie mapping topics to callback functions.
15+
- `keep_alive::UInt16`: The keep-alive time in seconds.
16+
- `last_id::UInt16`: The last packet identifier used.
17+
- `in_flight::Dict{UInt16, Future}`: A dictionary mapping packet identifiers to futures.
18+
- `write_packets::AbstractChannel`: A channel for writing packets.
19+
- `socket`: The socket used for communication with the broker.
20+
- `socket_lock`: A lock for synchronizing access to the socket.
21+
- `ping_timeout::UInt64`: The ping timeout in seconds.
22+
- `ping_outstanding::Atomic{UInt8}`: An atomic counter for the number of outstanding ping requests.
23+
- `last_sent::Atomic{Float64}`: An atomic float representing the timestamp of the last sent packet.
24+
- `last_received::Atomic{Float64}`: An atomic float representing the timestamp of the last received packet.
2425
2526
# Constructor
27+
2628
`Client(ping_timeout::UInt64=UInt64(60))` constructs a new `Client` object with the specified ping timeout (default: 60 seconds).
2729
"""
2830
mutable struct Client <: AbstractConfigElement
@@ -50,26 +52,27 @@ mutable struct Client <: AbstractConfigElement
5052
read_task::Task
5153
keep_alive_task::Task
5254

53-
Client(ping_timeout::UInt64 = UInt64(60)) = new(
54-
0x00,
55-
TrieNode(),
56-
0x0000,
57-
0x0000,
58-
Dict{UInt16,Future}(),
59-
Channel{Packet}(typemax(Int64)),
60-
IOBuffer(),
61-
ReentrantLock(),
62-
ping_timeout,
63-
0x00,
64-
0.0,
65-
0.0,
66-
Task(nothing),
67-
Task(nothing),
68-
Task(nothing),
69-
)
55+
function Client(ping_timeout::UInt64=UInt64(60))
56+
return new(
57+
0x00,
58+
TrieNode(),
59+
0x0000,
60+
0x0000,
61+
Dict{UInt16,Future}(),
62+
Channel{Packet}(typemax(Int64)),
63+
IOBuffer(),
64+
ReentrantLock(),
65+
ping_timeout,
66+
0x00,
67+
0.0,
68+
0.0,
69+
Task(nothing),
70+
Task(nothing),
71+
Task(nothing),
72+
)
73+
end
7074
end
7175

72-
7376
function write_loop(client::Client)::UInt8
7477
try
7578
while !isclosed(client)
@@ -115,7 +118,6 @@ function write_loop(client::Client)::UInt8
115118
end
116119
end
117120

118-
119121
function read_loop(client::Client)::UInt8
120122
try
121123
while !isclosed(client)
@@ -157,18 +159,17 @@ function read_loop(client::Client)::UInt8
157159
end
158160
end
159161

160-
161162
function keep_alive_loop(client::Client)::UInt8
162163
ping_sent = time()
163164

164165
# TODO: improve, this causes reconnect to take ~1 second. is there a way to interupt?
165166
check_interval = 1
166-
167-
timer = Timer(0, interval = check_interval)
167+
168+
timer = Timer(0; interval=check_interval)
168169

169170
while !isclosed(client)
170171
if time() - @atomic(client.last_sent) >= client.keep_alive ||
171-
time() - @atomic(client.last_received) >= client.keep_alive
172+
time() - @atomic(client.last_received) >= client.keep_alive
172173
if @atomic(client.ping_outstanding) == 0x0
173174
@atomicswap client.ping_outstanding = 0x1
174175
try
@@ -191,7 +192,7 @@ function keep_alive_loop(client::Client)::UInt8
191192
end
192193

193194
if @atomic(client.ping_outstanding) == 1 &&
194-
time() - ping_sent >= client.ping_timeout
195+
time() - ping_sent >= client.ping_timeout
195196
try # No pingresp received
196197
disconnect(client)
197198
break
@@ -222,18 +223,20 @@ end
222223

223224
# write packet to mqtt broker
224225
function write_packet(client::Client, cmd::UInt8, data...)
225-
put!(client.write_packets, Packet(cmd, data))
226+
return put!(client.write_packets, Packet(cmd, data))
226227
end
227228

228229
isready(client::Client)::Bool = client.state == 0x00
229230
isconnected(client::Client)::Bool = client.state == 0x01
230231
isclosed(client::Client)::Bool = client.state >= 0x02
231232
iserror(client::Client)::Bool = client.state == 0x03
232233

233-
show(io::IO, client::Client) = print(
234-
io,
235-
"MQTTClient[state: $(get(CLIENT_STATE, client.state, :unknown)), read_loop: $(taskstatus(client.read_task)), write_loop: $(taskstatus(client.write_task)), keep_alive: $(taskstatus(client.keep_alive_task))]\n$(show_tree(client.on_msg))",
236-
)
234+
function show(io::IO, client::Client)
235+
return print(
236+
io,
237+
"MQTTClient[state: $(get(CLIENT_STATE, client.state, :unknown)), read_loop: $(taskstatus(client.read_task)), write_loop: $(taskstatus(client.write_task)), keep_alive: $(taskstatus(client.keep_alive_task))]\n$(show_tree(client.on_msg))",
238+
)
239+
end
237240

238241
fetch(client::Client)::Tuple{UInt8,UInt8,UInt8} = begin
239242
try

0 commit comments

Comments
 (0)