Component Composition Guide¶
This guide shows you how to orchestrate multiple components together to build complex systems. Learn pipeline patterns, parallel execution, and error handling strategies for component composition.
Overview¶
Component composition enables building sophisticated systems from simple, reusable components. Components communicate via messages, forming pipelines, parallel processing units, or complex orchestration patterns.
Key Patterns:
- Pipeline: Sequential processing (A → B → C)
- Parallel: Independent concurrent processing
- Fan-Out/Fan-In: Distribute work, aggregate results (1 → N → 1)
Pipeline Patterns¶
Sequential Pipeline (A → B → C)¶
Process data through multiple stages:
// Layer 1: Standard library
use std::sync::Arc;
// Layer 2: Third-party crates
use tokio::sync::RwLock;
// Layer 3: Internal modules
use airssys_rt::prelude::*;
use airssys_wasm::actor::ComponentActor;
// Stage 1: Ingress (receives raw data)
#[derive(Clone)]
pub struct IngressComponent {
output_target: Arc<RwLock<Option<ComponentId>>>,
}
#[async_trait::async_trait]
impl Actor for IngressComponent {
type Message = IngressMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
IngressMessage::RawData(data) => {
// Process and forward to next stage
let processed = format!("ingress:{}", data);
let target = self.output_target.read().await;
if let Some(target_id) = *target {
context.send_message(target_id, ProcessorMessage::Process(processed)).await?;
}
}
IngressMessage::SetTarget(target_id) => {
let mut target = self.output_target.write().await;
*target = Some(target_id);
}
}
Ok(())
}
}
// Stage 2: Processor (transforms data)
#[derive(Clone)]
pub struct ProcessorComponent {
output_target: Arc<RwLock<Option<ComponentId>>>,
}
#[async_trait::async_trait]
impl Actor for ProcessorComponent {
type Message = ProcessorMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
ProcessorMessage::Process(data) => {
// Transform and forward to next stage
let transformed = format!("processor:{}", data);
let target = self.output_target.read().await;
if let Some(target_id) = *target {
context.send_message(target_id, EgressMessage::Finalize(transformed)).await?;
}
}
ProcessorMessage::SetTarget(target_id) => {
let mut target = self.output_target.write().await;
*target = Some(target_id);
}
}
Ok(())
}
}
// Stage 3: Egress (outputs results)
#[derive(Clone)]
pub struct EgressComponent {
results: Arc<RwLock<Vec<String>>>,
}
#[async_trait::async_trait]
impl Actor for EgressComponent {
type Message = EgressMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
_context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
EgressMessage::Finalize(data) => {
let mut results = self.results.write().await;
results.push(data);
println!("Pipeline output: {}", results.last().unwrap());
}
}
Ok(())
}
}
Setting Up Pipeline¶
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let actor_system = ActorSystem::new("pipeline-system").await?;
// Create components
let ingress = IngressComponent::new();
let processor = ProcessorComponent::new();
let egress = EgressComponent::new();
// Spawn components
let ingress_id = actor_system.spawn_component(ingress).await?;
let processor_id = actor_system.spawn_component(processor).await?;
let egress_id = actor_system.spawn_component(egress).await?;
// Wire pipeline: Ingress → Processor → Egress
ingress_id.send(IngressMessage::SetTarget(processor_id)).await?;
processor_id.send(ProcessorMessage::SetTarget(egress_id)).await?;
// Send data through pipeline
ingress_id.send(IngressMessage::RawData("hello".to_string())).await?;
// Output: "Pipeline output: processor:ingress:hello"
Ok(())
}
Performance:
- Message routing: 1.05µs per stage (Task 6.2 messaging_benchmarks.rs)
- 3-stage pipeline: ~3µs total latency
- Throughput: 333k pipelines/sec (1 / 3µs)
Parallel Execution Patterns¶
Independent Parallel Processing¶
Execute components concurrently:
use tokio::join;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let actor_system = ActorSystem::new("parallel-system").await?;
// Spawn 3 independent components
let component_a = ComponentA::new();
let component_b = ComponentB::new();
let component_c = ComponentC::new();
let id_a = actor_system.spawn_component(component_a).await?;
let id_b = actor_system.spawn_component(component_b).await?;
let id_c = actor_system.spawn_component(component_c).await?;
// Send messages in parallel (concurrent execution)
let (result_a, result_b, result_c) = join!(
id_a.send(MessageA::Process("data_a".to_string())),
id_b.send(MessageB::Process("data_b".to_string())),
id_c.send(MessageC::Process("data_c".to_string())),
);
println!("Parallel execution complete");
println!(" A: {:?}", result_a);
println!(" B: {:?}", result_b);
println!(" C: {:?}", result_c);
Ok(())
}
Performance:
- Concurrent operations validated in Task 6.2 (scalability_benchmarks.rs)
- 100 concurrent operations: 120µs total
- Throughput: 833k concurrent ops/sec
Fan-Out/Fan-In Pattern¶
Fan-Out (1 → N)¶
Distribute work to multiple workers:
#[derive(Clone)]
pub struct Coordinator {
workers: Arc<RwLock<Vec<ComponentId>>>,
}
#[async_trait::async_trait]
impl Actor for Coordinator {
type Message = CoordinatorMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
CoordinatorMessage::Distribute(data) => {
// Fan-out: Send to all workers
let workers = self.workers.read().await;
for worker_id in workers.iter() {
context.send_message(
*worker_id,
WorkerMessage::Process(data.clone())
).await?;
}
println!("Distributed to {} workers", workers.len());
}
CoordinatorMessage::AddWorker(worker_id) => {
let mut workers = self.workers.write().await;
workers.push(worker_id);
}
}
Ok(())
}
}
Performance (Task 6.2 messaging_benchmarks.rs):
- Pub-sub fanout to 100 subscribers: 85.2µs
- Per-subscriber overhead: ~852ns
- Throughput: 11,737 fanouts/sec (100 subscribers each)
Fan-In (N → 1)¶
Aggregate results from multiple sources:
#[derive(Clone)]
pub struct Aggregator {
results: Arc<RwLock<Vec<WorkerResult>>>,
expected_count: Arc<RwLock<usize>>,
}
#[async_trait::async_trait]
impl Actor for Aggregator {
type Message = AggregatorMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
_context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
AggregatorMessage::Result(result) => {
let mut results = self.results.write().await;
results.push(result);
// Check if all results received
let expected = *self.expected_count.read().await;
if results.len() == expected {
println!("All {} results aggregated", expected);
// Process aggregated results
let sum: i64 = results.iter().map(|r| r.value).sum();
println!("Aggregated sum: {}", sum);
}
}
AggregatorMessage::SetExpectedCount(count) => {
let mut expected = self.expected_count.write().await;
*expected = count;
}
}
Ok(())
}
}
Component Dependencies¶
Startup Order¶
Start components in dependency order:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let actor_system = ActorSystem::new("ordered-system").await?;
// 1. Start foundation components (no dependencies)
let database_id = actor_system.spawn_component(DatabaseComponent::new()).await?;
let cache_id = actor_system.spawn_component(CacheComponent::new()).await?;
// Wait for foundation components to be ready
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// 2. Start service components (depend on database/cache)
let service_id = actor_system.spawn_component(
ServiceComponent::new(database_id, cache_id)
).await?;
// 3. Start API gateway (depends on service)
let api_id = actor_system.spawn_component(
APIGatewayComponent::new(service_id)
).await?;
println!("All components started in order");
Ok(())
}
Performance:
- Component spawn: 286ns per component (Task 6.2 actor_lifecycle_benchmarks.rs)
- 10 components: 2.86µs total spawn time
- 100 components: 28.6µs total spawn time
Shutdown Order¶
Stop components in reverse dependency order:
pub async fn graceful_shutdown(
actor_system: ActorSystem,
component_ids: Vec<ComponentId>,
) -> Result<(), Box<dyn std::error::Error>> {
// Reverse order: stop API → Service → Database/Cache
for component_id in component_ids.iter().rev() {
println!("Stopping component: {}", component_id);
actor_system.stop_component(*component_id).await?;
// Wait for component to stop cleanly
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
println!("All components stopped");
Ok(())
}
Error Propagation in Pipelines¶
Failure Handling Strategies¶
Strategy 1: Stop on First Error
#[async_trait::async_trait]
impl Actor for ProcessorComponent {
type Message = ProcessorMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
ProcessorMessage::Process(data) => {
// Process or propagate error
let result = self.process_data(&data).await?;
// Forward only on success
let target = self.output_target.read().await;
if let Some(target_id) = *target {
context.send_message(target_id, EgressMessage::Finalize(result)).await?;
}
Ok(())
}
}
}
}
Strategy 2: Continue with Error Logging
#[async_trait::async_trait]
impl Actor for ResilientProcessor {
type Message = ProcessorMessage;
type Error = ComponentError;
async fn handle_message(
&mut self,
message: Self::Message,
context: &ActorContext,
) -> Result<(), Self::Error> {
match message {
ProcessorMessage::Process(data) => {
match self.process_data(&data).await {
Ok(result) => {
// Success - forward result
let target = self.output_target.read().await;
if let Some(target_id) = *target {
context.send_message(
target_id,
EgressMessage::Finalize(result)
).await?;
}
}
Err(err) => {
// Error - log and continue processing
tracing::error!(
component_id = %context.component_id,
error = %err,
"Processing failed, skipping message"
);
}
}
Ok(())
}
}
}
}
Strategy 3: Dead Letter Queue
pub struct DeadLetterQueue {
failed_messages: Arc<RwLock<Vec<FailedMessage>>>,
}
#[derive(Debug)]
pub struct FailedMessage {
pub original_message: String,
pub error: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub component_id: ComponentId,
}
impl DeadLetterQueue {
pub async fn send_to_dlq(
&self,
message: String,
error: String,
component_id: ComponentId,
) {
let mut failed = self.failed_messages.write().await;
failed.push(FailedMessage {
original_message: message,
error,
timestamp: chrono::Utc::now(),
component_id,
});
}
}
// Usage in component
match self.process_data(&data).await {
Ok(result) => { /* forward */ }
Err(err) => {
// Send to dead letter queue for later retry
dlq.send_to_dlq(data, err.to_string(), context.component_id).await;
}
}
State Sharing Between Components¶
When to Share State¶
Appropriate Use Cases:
- Configuration data (read-only, infrequent updates)
- Metrics aggregation (write-mostly, periodic reads)
- Shared caches (read-heavy, concurrent access)
Avoid Sharing State When:
- Frequent writes from multiple components (high contention)
- Order-dependent operations (use message passing instead)
- Complex synchronization needed (deadlock risk)
Safe State Sharing Pattern¶
use dashmap::DashMap;
// Shared state with concurrent access
pub struct SharedCache {
cache: Arc<DashMap<String, CachedValue>>,
}
#[derive(Clone)]
pub struct CachedValue {
pub data: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl SharedCache {
pub fn new() -> Self {
Self {
cache: Arc::new(DashMap::new()),
}
}
/// Read from cache (lock-free for concurrent reads)
pub fn get(&self, key: &str) -> Option<CachedValue> {
self.cache.get(key).map(|entry| entry.value().clone())
}
/// Write to cache (concurrent writes safe)
pub fn insert(&self, key: String, value: CachedValue) {
self.cache.insert(key, value);
}
}
// Component A writes
let cache = SharedCache::new();
cache.insert("key1".to_string(), CachedValue {
data: "value1".to_string(),
timestamp: chrono::Utc::now(),
});
// Component B reads (concurrent, no blocking)
if let Some(value) = cache.get("key1") {
println!("Cached: {}", value.data);
}
Performance (Task 6.2 actor_lifecycle_benchmarks.rs):
- State access (read): 37ns
- State access (write): 39ns
- Concurrent access validated in scalability tests
Composition Best Practices¶
1. Keep Components Small and Focused¶
Each component should have a single responsibility:
// ✅ GOOD: Focused component
pub struct JSONParserComponent {
// Only parses JSON
}
// ❌ BAD: Too many responsibilities
pub struct SuperComponent {
// Parses JSON, validates, transforms, stores, sends emails...
}
2. Use Message Passing Over Shared State¶
Prefer messages for coordination:
// ✅ GOOD: Message-based coordination
component_a.send(MessageA::RequestData(query)).await?;
// Component B responds via callback message
// ❌ BAD: Shared mutable state
let shared_state = Arc::new(RwLock::new(State::new()));
// Multiple components mutate shared_state (contention risk)
3. Make Components Stateless When Possible¶
Stateless components are easier to scale and recover:
// ✅ GOOD: Stateless transformer
pub struct TransformerComponent {
// No internal state, pure transformation
}
// Message includes all needed data
pub enum TransformerMessage {
Transform { input: String, output_target: ComponentId },
}
4. Handle Errors at Boundaries¶
Don't propagate errors across component boundaries unnecessarily:
// ✅ GOOD: Handle errors locally
match self.external_api_call().await {
Ok(result) => { /* forward result */ }
Err(err) => {
tracing::error!("API call failed: {}", err);
// Send error message instead of propagating
context.send_message(target, ErrorMessage::APIFailure(err.to_string())).await?;
}
}
5. Use Supervision for Recovery¶
Let supervisor handle component crashes:
// ✅ GOOD: Let supervisor handle crashes
// Component crashes → supervisor restarts → automatic recovery
// ❌ BAD: Try/catch everything
// Component never crashes → errors accumulate → degraded state
Summary¶
Compose components in under 45 minutes:
- Pipeline Pattern: Chain components for sequential processing (A → B → C)
- Parallel Pattern: Execute components concurrently for independent tasks
- Fan-Out/Fan-In: Distribute work (1 → N) and aggregate results (N → 1)
- Manage Dependencies: Start components in order, stop in reverse order
- Error Handling: Choose strategy (stop, continue, dead letter queue)
- State Sharing: Minimize shared state, prefer message passing
Performance Characteristics:
- Message routing: 1.05µs per hop (messaging_benchmarks.rs)
- Pipeline (3 stages): ~3µs total latency
- Fan-out (100 components): 85.2µs (messaging_benchmarks.rs)
- Concurrent operations (100): 120µs (scalability_benchmarks.rs)
Next Steps¶
- Supervision and Recovery - Handle component failures
- Production Deployment - Deploy composed systems
- Best Practices - Advanced composition patterns