Actor Model Design¶
The actor model in airssys-rt provides a type-safe, performant implementation of the actor pattern based on Erlang/OTP principles.
Note: All code examples are taken from the actual implementation. See examples directory for complete working code.
Actor Trait Architecture¶
Core Actor Trait¶
The foundational trait that all actors implement (from src/actor/traits.rs):
#[async_trait]
pub trait Actor: Send + Sync + 'static {
/// The type of messages this actor can handle.
type Message: Message;
/// The error type returned by actor operations.
type Error: Error + Send + Sync + 'static;
/// Handle an incoming message.
async fn handle_message<B: MessageBroker<Self::Message>>(
&mut self,
message: Self::Message,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error>;
/// Called when the actor is started (optional).
async fn pre_start<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
Ok(())
}
/// Called when the actor is stopping (optional).
async fn post_stop<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
Ok(())
}
/// Handle errors and return supervision decision (optional).
async fn on_error<B: MessageBroker<Self::Message>>(
&mut self,
error: Self::Error,
context: &mut ActorContext<Self::Message, B>,
) -> ErrorAction {
ErrorAction::Restart
}
}
Design Principles¶
- Generic Constraints (§6.2): Uses
<B: MessageBroker<Self::Message>>instead ofdyntrait objects - Associated Types:
MessageandErrortypes for compile-time type safety - Lifecycle Hooks: Optional
pre_startandpost_stopfor initialization and cleanup - Supervision Integration:
on_errorreturnsErrorActionfor fault tolerance
Actor State Encapsulation¶
Actors maintain private state that can only be modified through message handling. Example from examples/actor_basic.rs:
struct CounterActor {
value: i32, // Private state
max_value: i32, // Configuration
}
#[async_trait]
impl Actor for CounterActor {
type Message = CounterMessage;
type Error = CounterError;
async fn handle_message<B: MessageBroker<Self::Message>>(
&mut self,
message: Self::Message,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
// Only way to modify state
self.value += message.delta;
if self.value > self.max_value {
return Err(CounterError {
message: format!("Value {} exceeds maximum {}",
self.value, self.max_value),
});
}
context.record_message();
Ok(())
}
}
Key principles:
- State fields are private (not pub)
- State only modified in handle_message
- No direct external access to state
- Thread safety through message passing
Message Design¶
Message Trait¶
All messages must implement the Message trait (from src/message/mod.rs):
pub trait Message: Clone + Send + Sync + 'static
+ for<'de> serde::Deserialize<'de> + serde::Serialize
{
const MESSAGE_TYPE: &'static str;
}
Message Implementation Example¶
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CounterMessage {
delta: i32,
}
impl Message for CounterMessage {
const MESSAGE_TYPE: &'static str = "counter";
}
Message Envelope¶
Messages are wrapped for routing (from src/message/envelope.rs):
pub struct MessageEnvelope<M> {
pub id: MessageId,
pub message: M,
pub timestamp: DateTime<Utc>, // §3.2 chrono DateTime<Utc>
pub reply_to: Option<ActorAddress>,
}
impl<M: Message> MessageEnvelope<M> {
pub fn new(message: M) -> Self {
Self {
id: MessageId::new(),
message,
timestamp: Utc::now(),
reply_to: None,
}
}
}
Actor Lifecycle¶
Lifecycle States¶
Actors transition through defined states (from src/actor/lifecycle.rs):
pub enum ActorState {
Starting, // Actor is initializing
Running, // Actor is active and processing messages
Stopping, // Actor is shutting down
Stopped, // Actor has stopped successfully
Failed, // Actor has failed (requires supervision)
}
Lifecycle Management¶
The ActorLifecycle struct provides state management:
#[derive(Debug, Clone)]
pub struct ActorLifecycle {
state: ActorState,
last_state_change: DateTime<Utc>,
restart_count: u32,
}
impl ActorLifecycle {
pub fn new() -> Self;
pub fn state(&self) -> ActorState;
pub fn transition_to(&mut self, new_state: ActorState);
pub fn restart_count(&self) -> u32;
pub fn last_state_change(&self) -> DateTime<Utc>;
pub fn is_terminal(&self) -> bool;
pub fn is_running(&self) -> bool;
}
Lifecycle Hooks¶
Actors can override lifecycle hooks:
#[async_trait]
impl Actor for MyActor {
type Message = MyMessage;
type Error = MyError;
// Called before actor starts processing messages
async fn pre_start<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
println!("Actor {} starting", context.address().name().unwrap_or("anonymous"));
// Initialize resources, connect to databases, etc.
Ok(())
}
// Called when actor stops
async fn post_stop<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
println!("Actor {} stopping", context.address().name().unwrap_or("anonymous"));
// Cleanup resources, close connections, etc.
Ok(())
}
async fn handle_message<B: MessageBroker<Self::Message>>(
&mut self,
message: Self::Message,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
// Process messages
Ok(())
}
}
See examples/actor_lifecycle.rs for a complete lifecycle example.
Actor Addressing¶
ActorAddress¶
Actors are identified by addresses (from src/util/address.rs):
pub struct ActorAddress {
id: ActorId,
name: Option<String>,
}
impl ActorAddress {
pub fn anonymous() -> Self;
pub fn named(name: impl Into<String>) -> Self;
pub fn id(&self) -> &ActorId;
pub fn name(&self) -> Option<&str>;
}
ActorId¶
Unique identifiers using UUIDs:
pub struct ActorId(uuid::Uuid);
impl ActorId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4())
}
}
Message Broker System¶
MessageBroker Trait¶
The pub/sub system for actor communication (from src/broker/traits.rs):
#[async_trait]
pub trait MessageBroker<M: Message>: Clone + Send + Sync + 'static {
type Error: Error + Send + Sync + 'static;
async fn publish(&self, envelope: MessageEnvelope<M>)
-> Result<(), Self::Error>;
async fn subscribe(&self, subscriber_id: ActorId)
-> Result<mpsc::Receiver<MessageEnvelope<M>>, Self::Error>;
}
InMemoryMessageBroker¶
Current implementation using channels (from src/broker/in_memory.rs):
#[derive(Clone)]
pub struct InMemoryMessageBroker<M: Message> {
subscribers: Arc<Mutex<HashMap<ActorId, mpsc::Sender<MessageEnvelope<M>>>>>,
}
impl<M: Message> InMemoryMessageBroker<M> {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(HashMap::new())),
}
}
}
Communication via Context¶
Actors send messages using their context:
// In actor's handle_message method
async fn handle_message<B: MessageBroker<Self::Message>>(
&mut self,
message: Self::Message,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
// Send message to another actor
context.send(message, recipient_address).await?;
Ok(())
}
Mailbox System¶
Mailbox Types¶
The runtime provides two mailbox implementations (from src/mailbox/):
UnboundedMailbox - Unlimited capacity:
pub struct UnboundedMailbox<M: Message> {
receiver: mpsc::UnboundedReceiver<MessageEnvelope<M>>,
metrics: Arc<AtomicMetrics>,
}
BoundedMailbox - Limited capacity with backpressure:
pub struct BoundedMailbox<M: Message> {
receiver: mpsc::Receiver<MessageEnvelope<M>>,
capacity: usize,
backpressure: BackpressureStrategy,
metrics: Arc<AtomicMetrics>,
}
Backpressure Strategies¶
pub enum BackpressureStrategy {
Block, // Block sender when mailbox full
Drop, // Drop new messages when full
DropOldest, // Drop oldest message to make room
}
Mailbox Traits¶
Generic mailbox interface:
#[async_trait]
pub trait MailboxReceiver<M: Message>: Send {
async fn recv(&mut self) -> Option<MessageEnvelope<M>>;
fn try_recv(&mut self) -> Result<MessageEnvelope<M>, TryRecvError>;
}
#[async_trait]
pub trait MailboxSender<M: Message>: Clone + Send + Sync {
async fn send(&self, envelope: MessageEnvelope<M>)
-> Result<(), MailboxError>;
}
Error Handling and Supervision¶
ErrorAction¶
Actors return ErrorAction from on_error to control fault handling:
pub enum ErrorAction {
Resume, // Continue processing (ignore error)
Restart, // Restart the actor
Stop, // Stop the actor permanently
Escalate, // Pass error to supervisor
}
Error Flow¶
- Actor's
handle_messagereturnsErr(Self::Error) - Supervisor catches error
- Supervisor calls actor's
on_errormethod - Actor returns
ErrorAction - Supervisor applies restart strategy based on action
Example:
async fn on_error<B: MessageBroker<Self::Message>>(
&mut self,
error: Self::Error,
context: &mut ActorContext<Self::Message, B>,
) -> ErrorAction {
match error {
CounterError::OverflowError => {
eprintln!("Counter overflow, restarting");
ErrorAction::Restart
}
CounterError::UnrecoverableError => {
eprintln!("Unrecoverable error, stopping");
ErrorAction::Stop
}
_ => ErrorAction::Escalate,
}
}
Actor Monitoring¶
Health Checks¶
Actors can be monitored via the Child trait's health check:
async fn health_check(&self) -> ChildHealth {
if self.is_healthy() {
ChildHealth::Healthy
} else {
ChildHealth::Unhealthy("Connection lost".to_string())
}
}
Monitoring System¶
The monitoring system (from src/monitoring/) provides:
- Health status tracking
- Performance metrics
- Message queue depth
- Processing latency
- Error rates
See examples/monitoring_basic.rs and examples/monitoring_supervisor.rs for monitoring examples.
Working Examples¶
Explore these examples to understand the actor model:
| Example | Demonstrates | Command |
|---|---|---|
actor_basic.rs |
Core actor implementation | cargo run --example actor_basic |
actor_lifecycle.rs |
Lifecycle hooks | cargo run --example actor_lifecycle |
supervisor_basic.rs |
Supervision patterns | cargo run --example supervisor_basic |
supervisor_strategies.rs |
Restart strategies | cargo run --example supervisor_strategies |
monitoring_basic.rs |
Actor monitoring | cargo run --example monitoring_basic |
All examples are in the examples/ directory and demonstrate real implementations of these patterns.