Pub-Sub Broadcasting Pattern¶
Category: How-To Guide (Task-Oriented)
Purpose: This guide shows you how to implement publish-subscribe broadcasting for one-to-many component communication.
Overview¶
Pub-sub (publish-subscribe) is a messaging pattern where publishers broadcast messages to multiple subscribers without knowing who the subscribers are. This decoupling enables flexible, scalable communication in component-based systems.
Key Features:
- Topic-based message routing
- Multiple subscribers per topic
- Subscriber isolation (crash doesn't affect others)
- Dynamic subscription management
Performance: Fanout to 100 subscribers completes in 85.2µs (measured in Task 6.2 messaging_benchmarks.rs benchmark bench_pubsub_fanout_100, macOS M1, 100 samples).
When to Use Pub-Sub¶
Use pub-sub when:
- Broadcasting to multiple components is needed
- Publishers shouldn't know about subscribers
- Dynamic subscriber addition/removal is required
- Event-driven architecture is preferred
Don't use pub-sub when:
- One-to-one communication is sufficient (use request-response)
- Guaranteed delivery to specific component is required
- Response from subscriber is needed (use request-response instead)
Prerequisites¶
Before implementing pub-sub, you should understand: - ComponentActor basics (see Your First ComponentActor) - MessageBroker integration with airssys-rt - ComponentRegistry for component lookup
Implementation Steps¶
Step 1: Set Up MessageBroker¶
The MessageBroker handles topic-based message routing and subscriber management.
use std::sync::Arc;
use airssys_rt::broker::InMemoryMessageBroker;
use airssys_wasm::actor::ComponentMessage;
// Create broker (usually one per system)
let broker = Arc::new(InMemoryMessageBroker::<ComponentMessage>::new());
Note: InMemoryMessageBroker is provided by airssys-rt and supports all pub-sub operations.
Step 2: Create Publisher Component¶
The publisher broadcasts messages to a topic without knowing who the subscribers are.
use airssys_wasm::actor::{ComponentMessage, MessageRouter};
use airssys_wasm::core::ComponentId;
struct PublisherComponent {
router: MessageRouter<InMemoryMessageBroker<ComponentMessage>>,
topic: String,
}
impl PublisherComponent {
async fn publish_event(&self, event_data: Vec<u8>) -> Result<(), String> {
// Create event message
let message = ComponentMessage::Custom {
topic: self.topic.clone(),
payload: event_data,
};
// Broadcast to all subscribers via broker
// The broker handles fanout automatically
self.router.broadcast_to_topic(&self.topic, message).await
.map_err(|e| format!("Failed to publish: {}", e))?;
Ok(())
}
}
Step 3: Create Subscriber Components¶
Subscribers register interest in topics and receive all messages published to those topics.
struct SubscriberComponent {
id: ComponentId,
subscribed_topics: Vec<String>,
}
impl SubscriberComponent {
fn new(id: ComponentId, topics: Vec<String>) -> Self {
Self {
id,
subscribed_topics: topics,
}
}
async fn handle_event(&self, topic: &str, payload: Vec<u8>) {
// Process event
println!("Subscriber {} received event on topic '{}': {} bytes",
self.id.as_str(), topic, payload.len());
// Your business logic here
}
}
// In Actor::handle_message implementation
async fn handle_message(message: ComponentMessage) -> Result<(), WasmError> {
match message {
ComponentMessage::Custom { topic, payload } => {
if self.subscribed_topics.contains(&topic) {
self.handle_event(&topic, payload).await;
}
}
_ => {}
}
Ok(())
}
Step 4: Subscribe to Topics¶
Components subscribe to topics by registering with the broker.
use airssys_rt::broker::MessageBroker;
// Subscribe to topic
let topic = "sensor.temperature".to_string();
broker.subscribe(&topic, subscriber_addr.clone()).await
.map_err(|e| format!("Failed to subscribe: {}", e))?;
// Subscribe to multiple topics
for topic in &subscriber.subscribed_topics {
broker.subscribe(topic, subscriber_addr.clone()).await?;
}
Step 5: Unsubscribe from Topics¶
Components can unsubscribe when they no longer want to receive messages.
// Unsubscribe from specific topic
broker.unsubscribe(&topic, &subscriber_addr).await?;
// Unsubscribe from all topics (component shutdown)
for topic in &subscriber.subscribed_topics {
broker.unsubscribe(topic, &subscriber_addr).await.ok();
}
Topic Naming Conventions¶
Use hierarchical topic names for better organization:
// Good: hierarchical, specific
"sensor.temperature.room1"
"sensor.humidity.room2"
"actuator.light.room1"
// Bad: flat, unclear
"temp"
"data"
"event"
Benefits of hierarchical topics:
- Easier to manage subscriptions
- Supports wildcard subscriptions (if implemented)
- Better logging and debugging
Subscriber Isolation¶
One of the key benefits of pub-sub is subscriber isolation: if one subscriber crashes, others continue to receive messages.
// If subscriber B crashes, subscribers A and C still receive messages
let subscribers = vec![
ComponentId::new("subscriber-a"),
ComponentId::new("subscriber-b"), // crashes
ComponentId::new("subscriber-c"),
];
// Broker handles delivery failures gracefully
// Failed deliveries are logged but don't affect other subscribers
Implementation Detail: The broker sends messages to each subscriber independently. If delivery to one subscriber fails, the broker continues with remaining subscribers.
Performance Characteristics¶
Based on Task 6.2 benchmarks (messaging_benchmarks.rs):
| Operation | Latency | Benchmark |
|---|---|---|
| Fanout to 10 subscribers | ~8.5µs | bench_pubsub_fanout_10 |
| Fanout to 100 subscribers | 85.2µs | bench_pubsub_fanout_100 |
| Subscribe (register 10) | <500µs | bench_subscription_management |
| Broadcast single message | ~1.05µs | Inferred from routing |
Test Conditions: macOS M1, 100 samples, 95% confidence interval
Scalability Characteristics¶
- Linear fanout: 85.2µs for 100 subscribers = ~852ns per subscriber
- Constant subscription: Registration overhead is O(1)
- Concurrent delivery: Messages sent in parallel (limited by available threads)
Optimization Tips¶
- Batch Publications: If publishing multiple events, batch them when possible
- Filter Topics: Use specific topics to reduce unnecessary message delivery
- Subscriber Count: Monitor subscriber count per topic for capacity planning
- Async Delivery: Broker uses async delivery to minimize blocking
Broadcasting Patterns¶
Simple Broadcast¶
Send one message to all subscribers:
Filtered Broadcast¶
Send different messages based on subscriber criteria:
// Publisher sends raw data
publisher.publish_event("sensor.raw", raw_data).await?;
// Subscriber filters based on local criteria
impl SubscriberComponent {
async fn handle_event(&self, topic: &str, payload: Vec<u8>) {
if topic == "sensor.raw" && self.should_process(&payload) {
// Process only relevant events
}
}
fn should_process(&self, payload: &[u8]) -> bool {
// Filter logic
true
}
}
Multi-Topic Broadcast¶
Publish to multiple topics simultaneously:
async fn publish_multi_topic(&self, topics: &[String], payload: Vec<u8>) {
for topic in topics {
self.broker.broadcast_to_topic(topic,
ComponentMessage::Custom {
topic: topic.clone(),
payload: payload.clone(),
}
).await.ok();
}
}
Message Delivery Guarantees¶
Pub-sub provides:
- At-most-once delivery: Each subscriber receives each message at most once
- Best-effort delivery: Messages are delivered if subscriber is available and responsive
- No ordering guarantees: Messages may arrive out of order across different subscribers
Pub-sub does NOT guarantee:
- Exactly-once delivery: Use request-response for guaranteed delivery
- Message persistence: Messages are not stored if no subscribers are available
- Ordered delivery: Use sequence numbers if ordering is critical
Error Handling¶
Publisher Errors¶
match self.publish_event(data).await {
Ok(_) => {}
Err(e) if e.contains("broker unavailable") => {
// Broker is down, queue message or drop
log::error!("Broker unavailable: {}", e);
}
Err(e) => {
log::error!("Publish failed: {}", e);
}
}
Subscriber Errors¶
// In subscriber's handle_message
async fn handle_message(message: ComponentMessage) -> Result<(), WasmError> {
match message {
ComponentMessage::Custom { topic, payload } => {
// Wrap processing in error boundary
if let Err(e) = self.handle_event(&topic, payload).await {
log::error!("Event processing failed for topic {}: {}", topic, e);
// Don't propagate error to broker (isolate failures)
}
}
_ => {}
}
Ok(())
}
Complete Example¶
See examples/pubsub_component.rs for a complete, runnable example demonstrating: - Publisher broadcasting to multiple subscribers - Dynamic subscription management - Subscriber isolation - Fanout performance
Run the example:
Best Practices¶
- Use Hierarchical Topics: Organize topics in namespaces for clarity
- Isolate Failures: Catch errors in subscriber handlers to prevent affecting others
- Monitor Fanout Time: Track P99 fanout latency for performance regression
- Cleanup Subscriptions: Always unsubscribe during component shutdown
- Avoid Circular Dependencies: Publishers should not subscribe to their own topics
Common Mistakes¶
- Forgetting to Subscribe: Subscribers must explicitly subscribe to receive messages
- Blocking in Handlers: Subscriber handlers should be fast to avoid delaying other subscribers
- Leaking Subscriptions: Not unsubscribing causes memory leaks in broker
- Assuming Ordering: Messages may arrive out of order; use sequence numbers if needed
- Ignoring Errors: Publisher should handle broker unavailability gracefully
Advanced: Wildcard Subscriptions¶
If your broker supports wildcard subscriptions:
// Subscribe to all temperature sensors
broker.subscribe("sensor.temperature.*", addr).await?;
// Subscribe to all sensors in room1
broker.subscribe("sensor.*.room1", addr).await?;
// Subscribe to everything (use with caution)
broker.subscribe("*", addr).await?;
Note: Wildcard support depends on broker implementation. Check documentation for your specific broker.
Related Patterns¶
- Request-Response: For one-to-one communication with replies (see Request-Response Pattern)
- Event Sourcing: For event-driven architectures with persistent event logs
- Message Filtering: For subscriber-side filtering of events
- State Management: For sharing state across components (see State Management Patterns)
References¶
- ADR-WASM-009: Component Communication Model (Pattern 3: Pub-Sub)
- Task 6.2 Benchmarks:
benches/messaging_benchmarks.rs - API Reference: Message Routing
- airssys-rt: MessageBroker implementation