Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: improve docs for various types #12

Merged
merged 1 commit into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Sources/Pulsar/Documentation.docc/HowToUse.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct PulsarExample {
}

// Set up a consumer
let consumer = try await client.consumer(
let consumer: PulsarProducer<String> = try await client.consumer(
topic: "persistent://public/default/my-topic",
subscription: "test",
subscriptionType: .shared
Expand Down Expand Up @@ -108,13 +108,13 @@ struct PulsarExample {
}

// Set up a producer
let producer = try await client.producer(
let producer: PulsarProducer<String> = try await client.producer(
topic: "persistent://public/default/my-topic1",
accessMode: .shared,
schema: .string
) { _ in
print("Producer closed")
} as PulsarProducer<String>
}

// Send messages in a loop
Task {
Expand Down
40 changes: 34 additions & 6 deletions Sources/Pulsar/PulsarClient/PulsarClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,42 @@ import NIO
import NIOSSL
@_exported import SchemaTypes

/// The core Pulsar Client used to connect to the server.
/// The core Pulsar Client used to establish and manage connections to an Apache Pulsar server.
///
/// This actor manages the connection to a Pulsar server and provides functionality
/// for creating and managing producers and consumers. It also handles configuration
/// of connection parameters and retry mechanisms.
/// This actor is responsible for handling communication with the Pulsar server, including
/// establishing and managing connections, handling authentication, and providing an interface
/// for creating producers and consumers. It also implements reconnection logic, secure TLS handling,
/// and resource management for active connections.
///
/// All interactions with the Pulsar messaging system, such as sending or receiving messages,
/// are controlled through this client.
/// ## Features
/// - Supports secure (TLS) and non-secure connections.
/// - Manages a pool of active connections.
/// - Handles automatic reconnections in case of network failures.
/// - Supports configuration of connection parameters including hostname, port, and reconnection limits.
/// - Provides an event-driven interface for message producers and consumers.
/// - Closes all active channels gracefully when the client shuts down.
///
/// ## Usage
/// ```swift
/// let config = PulsarClientConfiguration(host: "pulsar.example.com", port: 6650)
/// let client = try await PulsarClient(configuration: config) { error in
/// print("Client closed: \(error)")
/// }
/// ```
///
/// Once initialized, the `PulsarClient` can be used to create producers and consumers to send and receive messages.
///
/// ## Connection Management
/// - The client maintains a `connectionPool` to track open connections.
/// - If the connection is lost, it attempts to reconnect based on the `reconnectLimit` configuration.
/// - TLS settings can be specified through `PulsarClientConfiguration` to establish a secure connection.
///
/// ## Closing the Client
/// When the client is no longer needed, it should be closed using:
/// ```swift
/// try await client.close()
/// ```
/// This ensures that all resources are released and all connections are closed cleanly.
public final actor PulsarClient {
let logger = Logger(label: "PulsarClient")
let group: EventLoopGroup
Expand Down
51 changes: 48 additions & 3 deletions Sources/Pulsar/PulsarConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,54 @@

/// A Pulsar consumer used to asynchronously consume messages from a specific topic.
///
/// This class provides support for consuming messages from a Pulsar topic using various subscription types.
/// It conforms to `AsyncSequence`, enabling iteration over received messages in an asynchronous context.
/// Generic `T` represents the type of payload for the messages, conforming to `PulsarPayload`.
/// This class provides functionality for consuming messages from an Apache Pulsar topic.
/// It supports different subscription types and conforms to `AsyncSequence`, allowing
/// messages to be iterated over in an asynchronous context using Swift's `for await` syntax.
///
/// ## Features:
/// - Conforms to `AsyncSequence`, enabling structured and idiomatic message consumption.
/// - Handles message acknowledgment automatically (if `autoAcknowledge` is enabled).
/// - Supports schema-based payload deserialization.
/// - Provides explicit error handling mechanisms.
///
/// ## Usage Example:
/// ```swift
/// let consumer = PulsarConsumer<MyPayload>(
/// autoAck: true,
/// handler: myHandler,
/// consumerID: 67890,
/// topic: "persistent://public/default/my-topic",
/// subscriptionName: "my-subscription",
/// subscriptionType: .shared,
/// subscriptionMode: .durable,
/// schema: mySchema
/// )
///
/// for await message in consumer {
/// print("Received message: \(message.payload)")
/// }
///
/// try await consumer.close() // Close the consumer when done
/// ```
///
/// ## Lifecycle:
/// - The consumer is initialized with a handler, topic, subscription details, and schema.
/// - Messages are received and decoded using the specified schema.
/// - The consumer continuously yields messages via `AsyncThrowingStream<Message<T>, Error>`.
/// - The consumer can be explicitly closed using `close()`, ensuring proper resource cleanup.
///
/// ## Error Handling:
/// - If message deserialization fails, the consumer will call `fail(error:)`, terminating the stream.
/// - If an error occurs while handling messages, the stream finishes with the provided error.
/// - Closing the consumer ensures proper detachment from the Pulsar client.
///
/// - Note: This class is designed to be `Sendable`, meaning it can be safely used in concurrent contexts.
///
/// - Parameters:
/// - T: A type conforming to ``PulsarPayload``, representing the message payload.
///
/// - SeeAlso: ``PulsarProducer`` for message publishing.
///
public final class PulsarConsumer<T: PulsarPayload>: AsyncSequence, Sendable, AnyConsumer {
public let consumerID: UInt64
let autoAcknowledge: Bool
Expand Down
48 changes: 45 additions & 3 deletions Sources/Pulsar/PulsarProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,51 @@

/// A Pulsar producer used to publish messages to a specific topic.
///
/// This component enables sending messages to a Pulsar topic. It supports configuration
/// for schema, and other publishing parameters to ensure efficient and reliable
/// message delivery.
/// This class provides a mechanism for sending messages to an Apache Pulsar topic.
/// It supports both synchronous and asynchronous message publishing, allowing
/// developers to choose between guaranteed delivery with broker acknowledgment
/// (`syncSend`) and a fire-and-forget approach (`asyncSend`). The producer
/// is designed to handle different schema types and manage its lifecycle efficiently.
///
/// ## Features:
/// - Publishes messages to a specified Pulsar topic.
/// - Supports synchronous (`syncSend`) and asynchronous (`asyncSend`) message delivery.
/// - Manages producer state via `ProducerStateManager`.
/// - Configurable access mode and schema.
/// - Provides a closure (`onClosed`) to handle producer shutdown events.
///
/// ## Usage Example:
/// ```swift
/// let producer = PulsarProducer<MyPayload>(
/// handler: myHandler,
/// producerAccessMode: .exclusive,
/// producerID: 12345,
/// schema: mySchema,
/// topic: "persistent://public/default/my-topic"
/// )
///
/// try await producer.syncSend(message: myMessage) // Waits for broker response
/// try await producer.asyncSend(message: myMessage) // Fire-and-forget
/// try await producer.close() // Closes the producer
/// ```
///
/// ## Lifecycle:
/// - The producer is initialized with a handler, schema, topic, and other configurations.
/// - Messages can be sent using `syncSend` (awaits broker acknowledgment) or `asyncSend` (does not wait).
/// - The producer can be explicitly closed using `close()`, triggering the `onClosed` handler if provided.
///
/// ## Error Handling:
/// - `syncSend` throws an error if a broker acknowledgment is not received within a timeout.
/// - `asyncSend` does not throw errors for timeout issues but will throw for major failures.
/// - `close()` ensures a graceful shutdown of the producer.
///
/// - Note: This class is designed to be `Sendable`, meaning it can be used safely in concurrent contexts.
///
/// - Parameters:
/// - T: A type conforming to ``PulsarPayload``, representing the payload schema.
///
/// - SeeAlso: ``PulsarConsumer`` for message consumtion.
///
public final class PulsarProducer<T: PulsarPayload>: Sendable, AnyProducer {
public let producerID: UInt64
let topic: String
Expand Down
8 changes: 4 additions & 4 deletions Sources/PulsarExample/PulsarExample.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ struct PulsarExample {
let client = try await PulsarClient(configuration: config) { error in
print("Error: \(error)")
}
let consumer =
let consumer: PulsarConsumer<String> =
try await client.consumer(
topic: "persistent://public/default/my-topic2",
subscription: "test",
subscriptionType: .shared,
schema: .string
) as PulsarConsumer<String>
)
Task {
do {
for try await message in consumer {
Expand All @@ -91,10 +91,10 @@ struct PulsarExample {
}
}

let producer =
let producer: PulsarProducer<String> =
try await client.producer(topic: "persistent://public/default/my-topic1", accessMode: .shared, schema: .string) { _ in
print("Produer closed")
} as PulsarProducer<String>
}
Task {
while true {
do {
Expand Down
Loading