Skip to content

RT Examples

Examples demonstrating the Actor Runtime for high-concurrency fault-tolerant applications.

Basic Actor Examples

Simple Counter Actor

use airssys_rt::prelude::*;
use async_trait::async_trait;

#[derive(Debug, Clone)]
enum CounterMsg {
    Increment,
    Decrement,
    GetCount(tokio::sync::oneshot::Sender<i64>),
}

impl Message for CounterMsg {
    const MESSAGE_TYPE: &'static str = "counter";
}

struct CounterActor {
    count: i64,
}

#[async_trait]
impl Actor for CounterActor {
    type Message = CounterMsg;
    type Error = std::io::Error;

    async fn handle_message<B: MessageBroker<Self::Message>>(
        &mut self,
        msg: Self::Message,
        _ctx: &mut ActorContext<Self::Message, B>,
    ) -> Result<(), Self::Error> {
        match msg {
            CounterMsg::Increment => {
                self.count += 1;
                println!("Count: {}", self.count);
            }
            CounterMsg::Decrement => {
                self.count -= 1;
                println!("Count: {}", self.count);
            }
            CounterMsg::GetCount(reply) => {
                let _ = reply.send(self.count);
            }
        }
        Ok(())
    }
}

Actor Lifecycle

use airssys_rt::prelude::*;

#[async_trait]
impl Actor for MyActor {
    type Message = MyMsg;
    type Error = std::io::Error;

    async fn pre_start<B: MessageBroker<Self::Message>>(
        &mut self,
        _ctx: &mut ActorContext<Self::Message, B>,
    ) -> Result<(), Self::Error> {
        println!("Actor starting...");
        // Initialize resources
        Ok(())
    }

    async fn handle_message<B: MessageBroker<Self::Message>>(
        &mut self,
        msg: Self::Message,
        _ctx: &mut ActorContext<Self::Message, B>,
    ) -> Result<(), Self::Error> {
        // Process message
        Ok(())
    }

    async fn post_stop<B: MessageBroker<Self::Message>>(
        &mut self,
        _ctx: &mut ActorContext<Self::Message, B>,
    ) -> Result<(), Self::Error> {
        println!("Actor stopping...");
        // Cleanup resources
        Ok(())
    }
}

Supervision Examples

Basic Supervisor

use airssys_rt::supervisor::*;
use airssys_rt::broker::InMemoryMessageBroker;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create message broker
    let broker = InMemoryMessageBroker::new();

    // Create supervisor with OneForOne strategy
    let supervisor = SupervisorBuilder::new()
        .with_strategy(RestartStrategy::OneForOne)
        .with_max_restarts(3, Duration::from_secs(60))
        .build();

    // Add child actors
    supervisor.add_child(
        "worker-1",
        Box::new(WorkerActor::new()),
        RestartPolicy::Permanent,
    ).await?;

    // Start supervision
    supervisor.start().await?;

    Ok(())
}

Supervision Strategies

// OneForOne: Restart only failed child
let supervisor = SupervisorBuilder::new()
    .with_strategy(RestartStrategy::OneForOne)
    .build();

// OneForAll: Restart all children when one fails
let supervisor = SupervisorBuilder::new()
    .with_strategy(RestartStrategy::OneForAll)
    .build();

// RestForOne: Restart failed child and those started after it
let supervisor = SupervisorBuilder::new()
    .with_strategy(RestartStrategy::RestForOne)
    .build();

Restart Policies

// Permanent: Always restart
supervisor.add_child(
    "critical-worker",
    Box::new(CriticalActor::new()),
    RestartPolicy::Permanent,
).await?;

// Transient: Restart only if abnormal termination
supervisor.add_child(
    "task-worker",
    Box::new(TaskActor::new()),
    RestartPolicy::Transient,
).await?;

// Temporary: Never restart
supervisor.add_child(
    "one-shot",
    Box::new(OneShotActor::new()),
    RestartPolicy::Temporary,
).await?;

Message Passing Examples

Request-Reply Pattern

#[derive(Debug, Clone)]
enum QueryMsg {
    GetData(tokio::sync::oneshot::Sender<Data>),
}

impl Message for QueryMsg {
    const MESSAGE_TYPE: &'static str = "query";
}

// Send request
let (tx, rx) = tokio::sync::oneshot::channel();
broker.publish(QueryMsg::GetData(tx), address).await?;

// Wait for reply
let data = rx.await?;

Pub-Sub Pattern

// Subscriber 1
broker.subscribe("events", subscriber1_address).await?;

// Subscriber 2
broker.subscribe("events", subscriber2_address).await?;

// Publisher
broker.publish_to_topic("events", Event::new()).await?;

// Both subscribers receive the message

Integration Examples

OSL Integration

use airssys_rt::supervisor::OSLSupervisor;
use airssys_rt::broker::InMemoryMessageBroker;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create broker
    let broker = InMemoryMessageBroker::new();

    // Create OSL supervisor
    let supervisor = OSLSupervisor::new(broker.clone());
    supervisor.start().await?;

    // Now FileSystem, Process, Network actors are running
    // with full fault tolerance

    println!("OSL actors under supervision");

    // Keep running
    tokio::time::sleep(Duration::from_secs(3600)).await;

    Ok(())
}

Performance Examples

High-Throughput Message Processing

use airssys_rt::prelude::*;
use std::time::Instant;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let broker = InMemoryMessageBroker::new();

    // Spawn actor
    let actor = HighThroughputActor::new();
    let address = ActorAddress::new("throughput-test");
    spawn_actor(actor, address.clone(), broker.clone()).await?;

    // Send 1M messages
    let start = Instant::now();
    for _ in 0..1_000_000 {
        broker.publish(ProcessMsg::new(), address.clone()).await?;
    }

    let duration = start.elapsed();
    let msgs_per_sec = 1_000_000.0 / duration.as_secs_f64();

    println!("Throughput: {:.2} msgs/sec", msgs_per_sec);

    Ok(())
}

Complete Examples

For complete, runnable examples, see the repository:

# Basic actor implementation
cargo run --example actor_basic

# Actor lifecycle hooks
cargo run --example actor_lifecycle

# Actor patterns (request-reply, pub-sub)
cargo run --example actor_patterns

# Basic supervision
cargo run --example supervisor_basic

# Supervision strategies
cargo run --example supervisor_strategies

# Advanced supervision with health checks
cargo run --example supervisor_advanced

# OSL integration
cargo run --example osl_integration_example

# Worker pool pattern
cargo run --example worker_pool

# Message passing patterns
cargo run --example message_patterns

Next Steps