Skip to content

Commit 7cba89f

Browse files
authored
Merge pull request #12 from flexlixrup/feature/docs
2 parents 46800e9 + d3ee53a commit 7cba89f

File tree

5 files changed

+134
-19
lines changed

5 files changed

+134
-19
lines changed

Sources/Pulsar/Documentation.docc/HowToUse.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ struct PulsarExample {
4242
}
4343

4444
// Set up a consumer
45-
let consumer = try await client.consumer(
45+
let consumer: PulsarProducer<String> = try await client.consumer(
4646
topic: "persistent://public/default/my-topic",
4747
subscription: "test",
4848
subscriptionType: .shared
@@ -108,13 +108,13 @@ struct PulsarExample {
108108
}
109109

110110
// Set up a producer
111-
let producer = try await client.producer(
111+
let producer: PulsarProducer<String> = try await client.producer(
112112
topic: "persistent://public/default/my-topic1",
113113
accessMode: .shared,
114114
schema: .string
115115
) { _ in
116116
print("Producer closed")
117-
} as PulsarProducer<String>
117+
}
118118

119119
// Send messages in a loop
120120
Task {

Sources/Pulsar/PulsarClient/PulsarClient.swift

+34-6
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,42 @@ import NIO
1717
import NIOSSL
1818
@_exported import SchemaTypes
1919

20-
/// The core Pulsar Client used to connect to the server.
20+
/// The core Pulsar Client used to establish and manage connections to an Apache Pulsar server.
2121
///
22-
/// This actor manages the connection to a Pulsar server and provides functionality
23-
/// for creating and managing producers and consumers. It also handles configuration
24-
/// of connection parameters and retry mechanisms.
22+
/// This actor is responsible for handling communication with the Pulsar server, including
23+
/// establishing and managing connections, handling authentication, and providing an interface
24+
/// for creating producers and consumers. It also implements reconnection logic, secure TLS handling,
25+
/// and resource management for active connections.
2526
///
26-
/// All interactions with the Pulsar messaging system, such as sending or receiving messages,
27-
/// are controlled through this client.
27+
/// ## Features
28+
/// - Supports secure (TLS) and non-secure connections.
29+
/// - Manages a pool of active connections.
30+
/// - Handles automatic reconnections in case of network failures.
31+
/// - Supports configuration of connection parameters including hostname, port, and reconnection limits.
32+
/// - Provides an event-driven interface for message producers and consumers.
33+
/// - Closes all active channels gracefully when the client shuts down.
34+
///
35+
/// ## Usage
36+
/// ```swift
37+
/// let config = PulsarClientConfiguration(host: "pulsar.example.com", port: 6650)
38+
/// let client = try await PulsarClient(configuration: config) { error in
39+
/// print("Client closed: \(error)")
40+
/// }
41+
/// ```
42+
///
43+
/// Once initialized, the `PulsarClient` can be used to create producers and consumers to send and receive messages.
44+
///
45+
/// ## Connection Management
46+
/// - The client maintains a `connectionPool` to track open connections.
47+
/// - If the connection is lost, it attempts to reconnect based on the `reconnectLimit` configuration.
48+
/// - TLS settings can be specified through `PulsarClientConfiguration` to establish a secure connection.
49+
///
50+
/// ## Closing the Client
51+
/// When the client is no longer needed, it should be closed using:
52+
/// ```swift
53+
/// try await client.close()
54+
/// ```
55+
/// This ensures that all resources are released and all connections are closed cleanly.
2856
public final actor PulsarClient {
2957
let logger = Logger(label: "PulsarClient")
3058
let group: EventLoopGroup

Sources/Pulsar/PulsarConsumer.swift

+48-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,54 @@
1414

1515
/// A Pulsar consumer used to asynchronously consume messages from a specific topic.
1616
///
17-
/// This class provides support for consuming messages from a Pulsar topic using various subscription types.
18-
/// It conforms to `AsyncSequence`, enabling iteration over received messages in an asynchronous context.
19-
/// Generic `T` represents the type of payload for the messages, conforming to `PulsarPayload`.
17+
/// This class provides functionality for consuming messages from an Apache Pulsar topic.
18+
/// It supports different subscription types and conforms to `AsyncSequence`, allowing
19+
/// messages to be iterated over in an asynchronous context using Swift's `for await` syntax.
20+
///
21+
/// ## Features:
22+
/// - Conforms to `AsyncSequence`, enabling structured and idiomatic message consumption.
23+
/// - Handles message acknowledgment automatically (if `autoAcknowledge` is enabled).
24+
/// - Supports schema-based payload deserialization.
25+
/// - Provides explicit error handling mechanisms.
26+
///
27+
/// ## Usage Example:
28+
/// ```swift
29+
/// let consumer = PulsarConsumer<MyPayload>(
30+
/// autoAck: true,
31+
/// handler: myHandler,
32+
/// consumerID: 67890,
33+
/// topic: "persistent://public/default/my-topic",
34+
/// subscriptionName: "my-subscription",
35+
/// subscriptionType: .shared,
36+
/// subscriptionMode: .durable,
37+
/// schema: mySchema
38+
/// )
39+
///
40+
/// for await message in consumer {
41+
/// print("Received message: \(message.payload)")
42+
/// }
43+
///
44+
/// try await consumer.close() // Close the consumer when done
45+
/// ```
46+
///
47+
/// ## Lifecycle:
48+
/// - The consumer is initialized with a handler, topic, subscription details, and schema.
49+
/// - Messages are received and decoded using the specified schema.
50+
/// - The consumer continuously yields messages via `AsyncThrowingStream<Message<T>, Error>`.
51+
/// - The consumer can be explicitly closed using `close()`, ensuring proper resource cleanup.
52+
///
53+
/// ## Error Handling:
54+
/// - If message deserialization fails, the consumer will call `fail(error:)`, terminating the stream.
55+
/// - If an error occurs while handling messages, the stream finishes with the provided error.
56+
/// - Closing the consumer ensures proper detachment from the Pulsar client.
57+
///
58+
/// - Note: This class is designed to be `Sendable`, meaning it can be safely used in concurrent contexts.
59+
///
60+
/// - Parameters:
61+
/// - T: A type conforming to ``PulsarPayload``, representing the message payload.
62+
///
63+
/// - SeeAlso: ``PulsarProducer`` for message publishing.
64+
///
2065
public final class PulsarConsumer<T: PulsarPayload>: AsyncSequence, Sendable, AnyConsumer {
2166
public let consumerID: UInt64
2267
let autoAcknowledge: Bool

Sources/Pulsar/PulsarProducer.swift

+45-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,51 @@
1414

1515
/// A Pulsar producer used to publish messages to a specific topic.
1616
///
17-
/// This component enables sending messages to a Pulsar topic. It supports configuration
18-
/// for schema, and other publishing parameters to ensure efficient and reliable
19-
/// message delivery.
17+
/// This class provides a mechanism for sending messages to an Apache Pulsar topic.
18+
/// It supports both synchronous and asynchronous message publishing, allowing
19+
/// developers to choose between guaranteed delivery with broker acknowledgment
20+
/// (`syncSend`) and a fire-and-forget approach (`asyncSend`). The producer
21+
/// is designed to handle different schema types and manage its lifecycle efficiently.
22+
///
23+
/// ## Features:
24+
/// - Publishes messages to a specified Pulsar topic.
25+
/// - Supports synchronous (`syncSend`) and asynchronous (`asyncSend`) message delivery.
26+
/// - Manages producer state via `ProducerStateManager`.
27+
/// - Configurable access mode and schema.
28+
/// - Provides a closure (`onClosed`) to handle producer shutdown events.
29+
///
30+
/// ## Usage Example:
31+
/// ```swift
32+
/// let producer = PulsarProducer<MyPayload>(
33+
/// handler: myHandler,
34+
/// producerAccessMode: .exclusive,
35+
/// producerID: 12345,
36+
/// schema: mySchema,
37+
/// topic: "persistent://public/default/my-topic"
38+
/// )
39+
///
40+
/// try await producer.syncSend(message: myMessage) // Waits for broker response
41+
/// try await producer.asyncSend(message: myMessage) // Fire-and-forget
42+
/// try await producer.close() // Closes the producer
43+
/// ```
44+
///
45+
/// ## Lifecycle:
46+
/// - The producer is initialized with a handler, schema, topic, and other configurations.
47+
/// - Messages can be sent using `syncSend` (awaits broker acknowledgment) or `asyncSend` (does not wait).
48+
/// - The producer can be explicitly closed using `close()`, triggering the `onClosed` handler if provided.
49+
///
50+
/// ## Error Handling:
51+
/// - `syncSend` throws an error if a broker acknowledgment is not received within a timeout.
52+
/// - `asyncSend` does not throw errors for timeout issues but will throw for major failures.
53+
/// - `close()` ensures a graceful shutdown of the producer.
54+
///
55+
/// - Note: This class is designed to be `Sendable`, meaning it can be used safely in concurrent contexts.
56+
///
57+
/// - Parameters:
58+
/// - T: A type conforming to ``PulsarPayload``, representing the payload schema.
59+
///
60+
/// - SeeAlso: ``PulsarConsumer`` for message consumtion.
61+
///
2062
public final class PulsarProducer<T: PulsarPayload>: Sendable, AnyProducer {
2163
public let producerID: UInt64
2264
let topic: String

Sources/PulsarExample/PulsarExample.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ struct PulsarExample {
6565
let client = try await PulsarClient(configuration: config) { error in
6666
print("Error: \(error)")
6767
}
68-
let consumer =
68+
let consumer: PulsarConsumer<String> =
6969
try await client.consumer(
7070
topic: "persistent://public/default/my-topic2",
7171
subscription: "test",
7272
subscriptionType: .shared,
7373
schema: .string
74-
) as PulsarConsumer<String>
74+
)
7575
Task {
7676
do {
7777
for try await message in consumer {
@@ -91,10 +91,10 @@ struct PulsarExample {
9191
}
9292
}
9393

94-
let producer =
94+
let producer: PulsarProducer<String> =
9595
try await client.producer(topic: "persistent://public/default/my-topic1", accessMode: .shared, schema: .string) { _ in
9696
print("Produer closed")
97-
} as PulsarProducer<String>
97+
}
9898
Task {
9999
while true {
100100
do {

0 commit comments

Comments
 (0)