Core Concepts¶
airssys-rt is built around several fundamental concepts adapted from the Erlang/BEAM runtime model. Understanding these concepts is essential for effectively using the actor runtime.
Note: All code examples in this document are taken from the actual implementation. For complete working examples, see the examples directory.
Actors and Message Processing¶
Actor Trait¶
The core Actor trait is the foundation of the runtime system. Every actor must implement this trait with associated types for messages and errors:
#[async_trait]
pub trait Actor: Send + Sync + 'static {
/// The type of messages this actor can handle.
type Message: Message;
/// The error type returned by actor operations.
type Error: Error + Send + Sync + 'static;
/// Handle an incoming message.
async fn handle_message<B: MessageBroker<Self::Message>>(
&mut self,
message: Self::Message,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error>;
// Optional lifecycle hooks
async fn pre_start<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
Ok(())
}
async fn post_stop<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
Ok(())
}
async fn on_error<B: MessageBroker<Self::Message>>(
&mut self,
error: Self::Error,
context: &mut ActorContext<Self::Message, B>,
) -> ErrorAction {
ErrorAction::Restart
}
}
Actor Context¶
The ActorContext provides metadata and messaging capabilities to actors:
pub struct ActorContext<M: Message, B: MessageBroker<M>> {
address: ActorAddress,
id: ActorId,
created_at: DateTime<Utc>,
last_message_at: Option<DateTime<Utc>>,
message_count: u64,
broker: B, // Dependency injection
_marker: PhantomData<M>,
}
Key methods:
- address() - Get the actor's address
- id() - Get the actor's unique ID
- message_count() - Get total messages processed
- record_message() - Track message processing
- send(message, recipient) - Send messages to other actors
Process Lifecycle¶
Actors go through several lifecycle stages managed by the ActorLifecycle struct:
pub enum ActorState {
Starting, // Actor is initializing
Running, // Actor is active and processing messages
Stopping, // Actor is shutting down
Stopped, // Actor has stopped successfully
Failed, // Actor has failed (requires supervision)
}
The ActorLifecycle struct provides state management (from src/actor/lifecycle.rs):
#[derive(Debug, Clone)]
pub struct ActorLifecycle {
state: ActorState,
last_state_change: DateTime<Utc>,
restart_count: u32,
}
impl ActorLifecycle {
pub fn new() -> Self;
pub fn state(&self) -> ActorState;
pub fn transition_to(&mut self, new_state: ActorState);
pub fn restart_count(&self) -> u32;
pub fn is_terminal(&self) -> bool;
pub fn is_running(&self) -> bool;
}
Complete Actor Example¶
Here's a real actor implementation from examples/actor_basic.rs:
// Define a message type
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CounterMessage {
delta: i32,
}
impl Message for CounterMessage {
const MESSAGE_TYPE: &'static str = "counter";
}
// Define the actor
struct CounterActor {
value: i32,
max_value: i32,
}
// Define error type
#[derive(Debug)]
struct CounterError {
message: String,
}
impl fmt::Display for CounterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CounterError: {}", self.message)
}
}
impl std::error::Error for CounterError {}
// Implement Actor trait
#[async_trait]
impl Actor for CounterActor {
type Message = CounterMessage;
type Error = CounterError;
async fn handle_message<B: MessageBroker<Self::Message>>(
&mut self,
message: Self::Message,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
self.value += message.delta;
if self.value > self.max_value {
return Err(CounterError {
message: format!("Value {} exceeds maximum {}",
self.value, self.max_value),
});
}
context.record_message();
Ok(())
}
async fn pre_start<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
println!("[Actor {}] Starting with value: {}",
context.address().name().unwrap_or("anonymous"),
self.value);
Ok(())
}
async fn post_stop<B: MessageBroker<Self::Message>>(
&mut self,
context: &mut ActorContext<Self::Message, B>,
) -> Result<(), Self::Error> {
println!("[Actor {}] Stopping with value: {}",
context.address().name().unwrap_or("anonymous"),
self.value);
Ok(())
}
async fn on_error<B: MessageBroker<Self::Message>>(
&mut self,
error: Self::Error,
_context: &mut ActorContext<Self::Message, B>,
) -> ErrorAction {
eprintln!("Error: {}", error);
ErrorAction::Restart // Supervisor will restart this actor
}
}
Run this example:
Message System¶
Message Trait¶
All messages must implement the Message trait:
pub trait Message: Clone + Send + Sync + 'static
+ for<'de> serde::Deserialize<'de> + serde::Serialize
{
const MESSAGE_TYPE: &'static str;
}
Message Envelope¶
Messages are wrapped in envelopes for routing:
pub struct MessageEnvelope<M> {
pub id: MessageId,
pub message: M,
pub timestamp: DateTime<Utc>,
pub reply_to: Option<ActorAddress>,
}
Message Broker¶
The MessageBroker trait defines the pub/sub system:
#[async_trait]
pub trait MessageBroker<M: Message>: Clone + Send + Sync + 'static {
type Error: Error + Send + Sync + 'static;
async fn publish(&self, envelope: MessageEnvelope<M>)
-> Result<(), Self::Error>;
async fn subscribe(&self, subscriber_id: ActorId)
-> Result<mpsc::Receiver<MessageEnvelope<M>>, Self::Error>;
}
Current implementation: InMemoryMessageBroker (see src/broker/in_memory.rs)
Supervision Framework¶
Child Trait¶
Any entity can be supervised by implementing the Child trait:
#[async_trait]
pub trait Child: Send + Sync {
async fn start(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn stop(&mut self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn health_check(&self) -> ChildHealth;
}
Actors automatically implement Child via blanket implementation.
Supervision Strategies¶
Three BEAM-inspired restart strategies:
pub enum RestartStrategy {
OneForOne, // Restart only the failed child
OneForAll, // Restart all children when one fails
RestForOne, // Restart failed child and those started after it
}
Restart Policies¶
Control when children should be restarted:
pub enum RestartPolicy {
Permanent, // Always restart on failure
Transient, // Restart only on abnormal termination
Temporary, // Never restart
}
Child Specification¶
Configure supervised children:
pub struct ChildSpec {
pub id: ChildId,
pub restart_policy: RestartPolicy,
pub shutdown_policy: ShutdownPolicy,
pub significant: bool, // Does failure affect supervisor?
}
Complete Supervisor Example¶
From examples/supervisor_basic.rs:
use airssys_rt::supervisor::{Child, ChildHealth, ChildSpec, RestartPolicy};
// Define a worker that implements Child
struct SimpleWorker {
id: String,
fail_on_start: bool,
}
#[async_trait]
impl Child for SimpleWorker {
async fn start(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
if self.fail_on_start {
return Err(format!("Worker {} failed to start", self.id).into());
}
println!("Worker {} started", self.id);
Ok(())
}
async fn stop(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Worker {} stopped", self.id);
Ok(())
}
async fn health_check(&self) -> ChildHealth {
ChildHealth::Healthy
}
}
// Create supervisor with OneForOne strategy
let mut supervisor = SupervisorNode::new(
SupervisorId::new(),
OneForOne::new(),
);
// Add children
supervisor.add_child(
ChildSpec {
id: ChildId::new(),
restart_policy: RestartPolicy::Permanent,
shutdown_policy: ShutdownPolicy::default(),
significant: true,
},
Box::new(SimpleWorker {
id: "worker-1".to_string(),
fail_on_start: false,
}),
).await?;
Run this example:
Actor Addressing¶
ActorAddress¶
Actors are identified by addresses:
pub struct ActorAddress {
id: ActorId,
name: Option<String>,
}
impl ActorAddress {
pub fn anonymous() -> Self;
pub fn named(name: impl Into<String>) -> Self;
pub fn id(&self) -> &ActorId;
pub fn name(&self) -> Option<&str>;
}
ActorId¶
Unique identifiers for actors:
pub struct ActorId(uuid::Uuid);
impl ActorId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4())
}
}
Error Handling¶
ErrorAction¶
Actors return ErrorAction from on_error to control supervision:
pub enum ErrorAction {
Resume, // Continue processing (ignore error)
Restart, // Restart the actor
Stop, // Stop the actor permanently
Escalate, // Pass error to supervisor
}
Actor Error Flow¶
- Actor's
handle_messagereturnsErr(Self::Error) - Supervisor calls actor's
on_errormethod - Actor returns
ErrorActionto supervisor - Supervisor applies restart strategy based on action
Architecture Layers¶
The runtime is organized in layers:
- Message Layer (
src/message/) - Message types and envelopes - Broker Layer (
src/broker/) - Pub/sub message routing - Actor Layer (
src/actor/) - Actor trait and context - Mailbox Layer (
src/mailbox/) - Message queue management - Supervisor Layer (
src/supervisor/) - Fault tolerance - Monitoring Layer (
src/monitoring/) - Health checks and metrics - System Layer (
src/system/) - Runtime coordination (planned)
Each layer builds on the previous, following Microsoft Rust Guidelines (M-SIMPLE-ABSTRACTIONS).
Working Examples¶
Explore these examples to understand the runtime:
| Example | Description | Command |
|---|---|---|
actor_basic.rs |
Basic actor implementation | cargo run --example actor_basic |
actor_lifecycle.rs |
Lifecycle hooks | cargo run --example actor_lifecycle |
supervisor_basic.rs |
Basic supervision | cargo run --example supervisor_basic |
supervisor_strategies.rs |
Restart strategies | cargo run --example supervisor_strategies |
supervisor_automatic_health.rs |
Health monitoring | cargo run --example supervisor_automatic_health |
monitoring_basic.rs |
Actor monitoring | cargo run --example monitoring_basic |
monitoring_supervisor.rs |
Supervisor monitoring | cargo run --example monitoring_supervisor |
All examples are located in the examples/ directory and demonstrate real, working implementations of the concepts described in this document.