Messaging API Reference¶
This reference documents the message broker, mailbox system, and message delivery infrastructure.
Module: message¶
Message trait and broker types.
Trait: Message¶
Marker trait for types that can be sent as messages between actors.
Type Parameters:
Result: The type returned when this message is processed
Trait Bounds:
Send: Must be safe to send across thread boundaries'static: Must not contain non-static references
Design Rationale:
The Message trait is intentionally minimal to allow maximum flexibility. Any type that is Send + 'static can be a message by simply declaring its result type.
Example:
use airssys_rt::Message;
// Simple notification (no result)
struct Ping;
impl Message for Ping {
type Result = ();
}
// Query message (returns data)
struct GetUser {
user_id: u64,
}
impl Message for GetUser {
type Result = Option<User>;
}
// Command message (returns success/error)
struct UpdateUser {
user_id: u64,
name: String,
}
impl Message for UpdateUser {
type Result = Result<(), UpdateError>;
}
Module: broker¶
Message broker implementations.
Trait: MessageBroker¶
pub trait MessageBroker: Send + Sync {
async fn send<M>(&self, actor_id: ActorId, msg: M) -> Result<M::Result, SendError>
where
M: Message;
async fn broadcast<M>(&self, msg: M) -> Vec<Result<M::Result, SendError>>
where
M: Message + Clone;
}
Trait for message routing and delivery.
Required Methods:
send(): Sends a message to a specific actorbroadcast(): Sends a message to all registered actors
Trait Bounds:
Send + Sync: Can be safely shared across threads
Implementations:
InMemoryMessageBroker: Default in-memory broker
Struct: InMemoryMessageBroker¶
In-memory message broker using channels for delivery.
Characteristics:
- Lock-free message routing using
DashMap - Per-actor mailbox isolation
- Configurable backpressure strategies
- Zero-copy message passing (where possible)
Methods¶
new()¶
Creates a new in-memory message broker.
Returns:
InMemoryMessageBroker: New broker instance
Example:
register()¶
pub fn register<A>(&self, actor_id: ActorId, mailbox: Mailbox<A>) -> Result<(), BrokerError>
where
A: Actor,
Registers an actor's mailbox with the broker.
Type Parameters:
A: The actor type
Parameters:
actor_id: Unique identifier for the actormailbox: The actor's mailbox for receiving messages
Returns:
Ok(()): Registration successfulErr(BrokerError::AlreadyRegistered): Actor ID already in use
Example:
unregister()¶
Unregisters an actor from the broker.
Parameters:
actor_id: The actor to unregister
Returns:
Ok(()): Unregistration successfulErr(BrokerError::NotFound): Actor not registered
Behavior:
- Remaining messages in mailbox are dropped
- In-flight sends will fail with
SendError::ActorNotFound
send()¶
pub async fn send<M>(&self, actor_id: ActorId, msg: M) -> Result<M::Result, SendError>
where
M: Message,
Sends a message to an actor and waits for the result.
Type Parameters:
M: The message type
Parameters:
actor_id: Target actor identifiermsg: The message to send
Returns:
Ok(M::Result): Message processed successfullyErr(SendError): Delivery or processing failed
Performance:
- Average latency: ~737ns (including actor processing)
- Throughput: ~4.7M messages/second
Example:
use airssys_rt::Message;
struct GetStatus;
impl Message for GetStatus {
type Result = String;
}
let status = broker.send(actor_id, GetStatus).await?;
broadcast()¶
pub async fn broadcast<M>(&self, msg: M) -> Vec<Result<M::Result, SendError>>
where
M: Message + Clone,
Broadcasts a message to all registered actors.
Type Parameters:
M: The message type (must implementClone)
Parameters:
msg: The message to broadcast
Returns:
Vec<Result<M::Result, SendError>>: Results from all actors
Performance:
- Parallel delivery to all actors
- Individual failures don't affect other deliveries
Example:
struct Shutdown;
impl Message for Shutdown {
type Result = ();
}
let results = broker.broadcast(Shutdown).await;
for result in results {
if let Err(e) = result {
eprintln!("Shutdown failed: {:?}", e);
}
}
Module: mailbox¶
Mailbox implementations for actor message queues.
Enum: Mailbox<A>¶
Actor mailbox abstraction supporting bounded and unbounded queues.
Type Parameters:
A: The actor type
Variants:
Bounded(BoundedMailbox<A>): Fixed-capacity mailbox with backpressureUnbounded(UnboundedMailbox<A>): Unlimited capacity mailbox
Choosing a Mailbox:
| Use Case | Recommended | Rationale |
|---|---|---|
| High-throughput actors | Bounded | Prevents memory exhaustion |
| Low-volume actors | Unbounded | Simpler, no backpressure |
| Critical path | Bounded | Predictable latency |
| Background tasks | Unbounded | Flexibility over control |
Constructors¶
bounded()¶
Creates a bounded mailbox with specified capacity.
Parameters:
capacity: Maximum number of messages
Default Backpressure:
- Strategy:
BackpressureStrategy::Block - Behavior: Senders block when mailbox is full
Example:
unbounded()¶
Creates an unbounded mailbox with unlimited capacity.
Returns:
Mailbox<A>: Unbounded mailbox instance
Example:
Methods¶
enqueue()¶
Adds a message to the mailbox.
Parameters:
msg: Boxed message to enqueue
Returns:
Ok(()): Message enqueued successfullyErr(MailboxError::Full): Bounded mailbox at capacityErr(MailboxError::Closed): Mailbox has been closed
Performance:
- Bounded mailbox: ~181ns average
- Unbounded mailbox: ~150ns average
dequeue()¶
Removes and returns the next message from the mailbox.
Returns:
Some(msg): Next message availableNone: Mailbox is empty and closed
Behavior:
- Blocks until message available or mailbox closed
- FIFO ordering (first-in, first-out)
close()¶
Closes the mailbox, preventing new messages.
Behavior:
- Pending messages can still be dequeued
- New
enqueue()calls will fail - Dequeue returns
Nonewhen empty
Struct: BoundedMailbox<A>¶
Fixed-capacity mailbox with backpressure support.
Implementation:
- Uses
tokio::sync::mpsc::channelinternally - Configurable backpressure strategies
- Memory-bounded operation
Methods¶
with_backpressure()¶
Creates a bounded mailbox with custom backpressure strategy.
Parameters:
capacity: Maximum messagesstrategy: Backpressure behavior
Example:
use airssys_rt::mailbox::{BoundedMailbox, BackpressureStrategy};
let mailbox = BoundedMailbox::with_backpressure(
1000,
BackpressureStrategy::DropOldest,
);
len()¶
Returns the current number of messages in the mailbox.
Returns:
usize: Message count
Use Cases:
- Monitoring mailbox pressure
- Load balancing decisions
- Health checks
is_full()¶
Checks if the mailbox is at capacity.
Returns:
true: Mailbox is fullfalse: Mailbox has available capacity
Struct: UnboundedMailbox<A>¶
Unlimited-capacity mailbox.
Implementation:
- Uses
tokio::sync::mpsc::unbounded_channelinternally - No backpressure (will grow unbounded)
- Faster enqueue than bounded (no capacity check)
Warning:
- Can consume unbounded memory under load
- Recommend monitoring message queue length
- Consider bounded mailbox for production systems
Enum: BackpressureStrategy¶
Strategy for handling mailbox overflow.
Variants:
Block: Block sender until space available (default)DropOldest: Remove oldest message to make roomDropNewest: Drop the incoming messageFail: Return error to sender
Tradeoffs:
| Strategy | Latency | Throughput | Data Loss | Use Case |
|---|---|---|---|---|
| Block | Variable | Lower | None | Critical messages |
| DropOldest | Constant | Higher | Oldest | Latest-value semantics |
| DropNewest | Constant | Highest | Newest | Best-effort delivery |
| Fail | Constant | Highest | Newest | Explicit error handling |
Example:
use airssys_rt::mailbox::{BoundedMailbox, BackpressureStrategy};
// Critical financial transactions - never drop
let ledger_mailbox = BoundedMailbox::with_backpressure(
1000,
BackpressureStrategy::Block,
);
// Sensor readings - only care about latest
let sensor_mailbox = BoundedMailbox::with_backpressure(
100,
BackpressureStrategy::DropOldest,
);
// Best-effort notifications
let notification_mailbox = BoundedMailbox::with_backpressure(
500,
BackpressureStrategy::Fail,
);
Communication Patterns¶
Fire-and-Forget¶
Send a message without waiting for response.
// Message with no result
struct Notify {
event: String,
}
impl Message for Notify {
type Result = ();
}
// Send without waiting
actor_ref.tell(Notify {
event: "user_logged_in".to_string(),
})?;
Request-Reply¶
Synchronous-style communication.
struct GetBalance {
account_id: u64,
}
impl Message for GetBalance {
type Result = f64;
}
let balance = actor_ref.send(GetBalance { account_id: 123 }).await?;
Broadcast¶
Send to multiple actors.
struct HealthCheck;
impl Message for HealthCheck {
type Result = bool;
}
let results = broker.broadcast(HealthCheck).await;
Actor Pool¶
Load balance across workers.
struct WorkItem {
data: Vec<u8>,
}
impl Message for WorkItem {
type Result = Vec<u8>;
}
// Round-robin distribution
let worker = pool.get_worker();
let result = worker.send(WorkItem { data }).await?;
Performance Characteristics¶
Message Passing Latency¶
| Metric | Latency | Measurement |
|---|---|---|
| Message enqueue | 181ns | Mailbox send operation |
| Message dequeue | 150ns | Mailbox receive operation |
| Roundtrip (send+receive) | 737ns | Full message cycle |
| Cross-thread message | 850ns | With thread context switch |
Throughput¶
| Configuration | Messages/sec | Notes |
|---|---|---|
| Single actor | 4.7M | Single-threaded |
| 4 actors (no contention) | 18M | Linear scaling |
| 16 actors (high contention) | 45M | Sublinear scaling |
| Broadcast (10 actors) | 12M | Parallel delivery |
Memory Usage¶
| Component | Size | Notes |
|---|---|---|
| Message (avg) | 64 bytes | Varies by type |
| BoundedMailbox | 8KB | Capacity 1000 |
| UnboundedMailbox | 128 bytes | Plus message storage |
| Broker entry | 48 bytes | Per registered actor |
Error Types¶
Enum: MailboxError¶
Errors specific to mailbox operations.
Variants:
Full: Bounded mailbox at capacityClosed: Mailbox has been closedTimeout: Operation timed out
Enum: BrokerError¶
Errors specific to broker operations.
Variants:
AlreadyRegistered: Actor ID already registeredNotFound: Actor not found in brokerRoutingFailed(String): Message routing failed
See Also¶
- Core API Reference - Core types and system API
- Actors API Reference - Actor types and patterns
- Mailbox API Reference - Detailed mailbox API
- Broker API Reference - Detailed broker API
- Architecture: Message Passing - Design overview
- How-To: Message Passing - Usage guide