diff --git a/Sources/Pulsar/Documentation.docc/HowToUse.md b/Sources/Pulsar/Documentation.docc/HowToUse.md index 351173a..957252b 100644 --- a/Sources/Pulsar/Documentation.docc/HowToUse.md +++ b/Sources/Pulsar/Documentation.docc/HowToUse.md @@ -42,7 +42,7 @@ struct PulsarExample { } // Set up a consumer - let consumer = try await client.consumer( + let consumer: PulsarProducer = try await client.consumer( topic: "persistent://public/default/my-topic", subscription: "test", subscriptionType: .shared @@ -108,13 +108,13 @@ struct PulsarExample { } // Set up a producer - let producer = try await client.producer( + let producer: PulsarProducer = try await client.producer( topic: "persistent://public/default/my-topic1", accessMode: .shared, schema: .string ) { _ in print("Producer closed") - } as PulsarProducer + } // Send messages in a loop Task { diff --git a/Sources/Pulsar/PulsarClient/PulsarClient.swift b/Sources/Pulsar/PulsarClient/PulsarClient.swift index 4b3107b..7285e3d 100644 --- a/Sources/Pulsar/PulsarClient/PulsarClient.swift +++ b/Sources/Pulsar/PulsarClient/PulsarClient.swift @@ -17,14 +17,42 @@ import NIO import NIOSSL @_exported import SchemaTypes -/// The core Pulsar Client used to connect to the server. +/// The core Pulsar Client used to establish and manage connections to an Apache Pulsar server. /// -/// This actor manages the connection to a Pulsar server and provides functionality -/// for creating and managing producers and consumers. It also handles configuration -/// of connection parameters and retry mechanisms. +/// This actor is responsible for handling communication with the Pulsar server, including +/// establishing and managing connections, handling authentication, and providing an interface +/// for creating producers and consumers. It also implements reconnection logic, secure TLS handling, +/// and resource management for active connections. /// -/// All interactions with the Pulsar messaging system, such as sending or receiving messages, -/// are controlled through this client. +/// ## Features +/// - Supports secure (TLS) and non-secure connections. +/// - Manages a pool of active connections. +/// - Handles automatic reconnections in case of network failures. +/// - Supports configuration of connection parameters including hostname, port, and reconnection limits. +/// - Provides an event-driven interface for message producers and consumers. +/// - Closes all active channels gracefully when the client shuts down. +/// +/// ## Usage +/// ```swift +/// let config = PulsarClientConfiguration(host: "pulsar.example.com", port: 6650) +/// let client = try await PulsarClient(configuration: config) { error in +/// print("Client closed: \(error)") +/// } +/// ``` +/// +/// Once initialized, the `PulsarClient` can be used to create producers and consumers to send and receive messages. +/// +/// ## Connection Management +/// - The client maintains a `connectionPool` to track open connections. +/// - If the connection is lost, it attempts to reconnect based on the `reconnectLimit` configuration. +/// - TLS settings can be specified through `PulsarClientConfiguration` to establish a secure connection. +/// +/// ## Closing the Client +/// When the client is no longer needed, it should be closed using: +/// ```swift +/// try await client.close() +/// ``` +/// This ensures that all resources are released and all connections are closed cleanly. public final actor PulsarClient { let logger = Logger(label: "PulsarClient") let group: EventLoopGroup diff --git a/Sources/Pulsar/PulsarConsumer.swift b/Sources/Pulsar/PulsarConsumer.swift index 5ef7210..bc40255 100644 --- a/Sources/Pulsar/PulsarConsumer.swift +++ b/Sources/Pulsar/PulsarConsumer.swift @@ -14,9 +14,54 @@ /// A Pulsar consumer used to asynchronously consume messages from a specific topic. /// -/// This class provides support for consuming messages from a Pulsar topic using various subscription types. -/// It conforms to `AsyncSequence`, enabling iteration over received messages in an asynchronous context. -/// Generic `T` represents the type of payload for the messages, conforming to `PulsarPayload`. +/// This class provides functionality for consuming messages from an Apache Pulsar topic. +/// It supports different subscription types and conforms to `AsyncSequence`, allowing +/// messages to be iterated over in an asynchronous context using Swift's `for await` syntax. +/// +/// ## Features: +/// - Conforms to `AsyncSequence`, enabling structured and idiomatic message consumption. +/// - Handles message acknowledgment automatically (if `autoAcknowledge` is enabled). +/// - Supports schema-based payload deserialization. +/// - Provides explicit error handling mechanisms. +/// +/// ## Usage Example: +/// ```swift +/// let consumer = PulsarConsumer( +/// autoAck: true, +/// handler: myHandler, +/// consumerID: 67890, +/// topic: "persistent://public/default/my-topic", +/// subscriptionName: "my-subscription", +/// subscriptionType: .shared, +/// subscriptionMode: .durable, +/// schema: mySchema +/// ) +/// +/// for await message in consumer { +/// print("Received message: \(message.payload)") +/// } +/// +/// try await consumer.close() // Close the consumer when done +/// ``` +/// +/// ## Lifecycle: +/// - The consumer is initialized with a handler, topic, subscription details, and schema. +/// - Messages are received and decoded using the specified schema. +/// - The consumer continuously yields messages via `AsyncThrowingStream, Error>`. +/// - The consumer can be explicitly closed using `close()`, ensuring proper resource cleanup. +/// +/// ## Error Handling: +/// - If message deserialization fails, the consumer will call `fail(error:)`, terminating the stream. +/// - If an error occurs while handling messages, the stream finishes with the provided error. +/// - Closing the consumer ensures proper detachment from the Pulsar client. +/// +/// - Note: This class is designed to be `Sendable`, meaning it can be safely used in concurrent contexts. +/// +/// - Parameters: +/// - T: A type conforming to ``PulsarPayload``, representing the message payload. +/// +/// - SeeAlso: ``PulsarProducer`` for message publishing. +/// public final class PulsarConsumer: AsyncSequence, Sendable, AnyConsumer { public let consumerID: UInt64 let autoAcknowledge: Bool diff --git a/Sources/Pulsar/PulsarProducer.swift b/Sources/Pulsar/PulsarProducer.swift index f504c78..9757b2b 100644 --- a/Sources/Pulsar/PulsarProducer.swift +++ b/Sources/Pulsar/PulsarProducer.swift @@ -14,9 +14,51 @@ /// A Pulsar producer used to publish messages to a specific topic. /// -/// This component enables sending messages to a Pulsar topic. It supports configuration -/// for schema, and other publishing parameters to ensure efficient and reliable -/// message delivery. +/// This class provides a mechanism for sending messages to an Apache Pulsar topic. +/// It supports both synchronous and asynchronous message publishing, allowing +/// developers to choose between guaranteed delivery with broker acknowledgment +/// (`syncSend`) and a fire-and-forget approach (`asyncSend`). The producer +/// is designed to handle different schema types and manage its lifecycle efficiently. +/// +/// ## Features: +/// - Publishes messages to a specified Pulsar topic. +/// - Supports synchronous (`syncSend`) and asynchronous (`asyncSend`) message delivery. +/// - Manages producer state via `ProducerStateManager`. +/// - Configurable access mode and schema. +/// - Provides a closure (`onClosed`) to handle producer shutdown events. +/// +/// ## Usage Example: +/// ```swift +/// let producer = PulsarProducer( +/// handler: myHandler, +/// producerAccessMode: .exclusive, +/// producerID: 12345, +/// schema: mySchema, +/// topic: "persistent://public/default/my-topic" +/// ) +/// +/// try await producer.syncSend(message: myMessage) // Waits for broker response +/// try await producer.asyncSend(message: myMessage) // Fire-and-forget +/// try await producer.close() // Closes the producer +/// ``` +/// +/// ## Lifecycle: +/// - The producer is initialized with a handler, schema, topic, and other configurations. +/// - Messages can be sent using `syncSend` (awaits broker acknowledgment) or `asyncSend` (does not wait). +/// - The producer can be explicitly closed using `close()`, triggering the `onClosed` handler if provided. +/// +/// ## Error Handling: +/// - `syncSend` throws an error if a broker acknowledgment is not received within a timeout. +/// - `asyncSend` does not throw errors for timeout issues but will throw for major failures. +/// - `close()` ensures a graceful shutdown of the producer. +/// +/// - Note: This class is designed to be `Sendable`, meaning it can be used safely in concurrent contexts. +/// +/// - Parameters: +/// - T: A type conforming to ``PulsarPayload``, representing the payload schema. +/// +/// - SeeAlso: ``PulsarConsumer`` for message consumtion. +/// public final class PulsarProducer: Sendable, AnyProducer { public let producerID: UInt64 let topic: String diff --git a/Sources/PulsarExample/PulsarExample.swift b/Sources/PulsarExample/PulsarExample.swift index dabfe20..0f93865 100644 --- a/Sources/PulsarExample/PulsarExample.swift +++ b/Sources/PulsarExample/PulsarExample.swift @@ -65,13 +65,13 @@ struct PulsarExample { let client = try await PulsarClient(configuration: config) { error in print("Error: \(error)") } - let consumer = + let consumer: PulsarConsumer = try await client.consumer( topic: "persistent://public/default/my-topic2", subscription: "test", subscriptionType: .shared, schema: .string - ) as PulsarConsumer + ) Task { do { for try await message in consumer { @@ -91,10 +91,10 @@ struct PulsarExample { } } - let producer = + let producer: PulsarProducer = try await client.producer(topic: "persistent://public/default/my-topic1", accessMode: .shared, schema: .string) { _ in print("Produer closed") - } as PulsarProducer + } Task { while true { do {