Architecture¶
This document provides a comprehensive overview of AirsDSP's architecture, design decisions, and internal structure.
Architecture Status¶
Current Phase: Architecture Complete, Ready for Implementation
Implementation Status: Phase 1 (Foundation) Starting
Last Updated: 2025-12-16
High-Level Architecture¶
AirsDSP is organized in 3 architectural layers implemented as a Rust workspace with 6 modular crates.
The 3-Layer Model¶
┌─────────────────────────────────────────────────────┐
│ Layer 3: Orchestration │
│ Multi-pipeline system, task classification, │
│ intelligent routing │
│ Crate: airsdsp/orchestration │
└─────────────────────────────────────────────────────┘
↓ depends on
┌─────────────────────────────────────────────────────┐
│ Layer 2: Core + Patterns + Tooling │
│ ─────────────────────────────────────────────── │
│ 2A: Core execution (airsdsp/core) │
│ 2B: Pattern library (airsdsp/patterns) │
│ 2C: Tooling (airsdsp/eval, airsdsp/debug) │
└─────────────────────────────────────────────────────┘
↓ depends on
┌─────────────────────────────────────────────────────┐
│ Layer 1: Infrastructure │
│ Trait abstractions for LLM, vector stores, │
│ caching (no concrete implementations) │
│ Crate: airsdsp/infra │
└─────────────────────────────────────────────────────┘
Design Philosophy: Each layer builds on the one below, creating clear separation of concerns and enabling phased implementation.
Workspace Structure¶
AirsDSP is organized as a Rust workspace with 6 independent crates:
airsdsp/ (workspace root)
├── Cargo.toml # Workspace manifest
│
├── infra/ # Layer 1: Infrastructure trait abstractions
├── core/ # Layer 2A: Core DSP execution engine
├── patterns/ # Layer 2B: High-level reusable patterns
├── eval/ # Layer 2C: Evaluation metrics (G-Eval)
├── debug/ # Layer 2C: Debugging & observability
├── orchestration/ # Layer 3: Multi-pipeline orchestration
│
├── examples/ # Example applications
└── docs/ # Workspace documentation
Layer 1: Infrastructure (airsdsp/infra)¶
Purpose¶
Provides trait abstractions for external service integrations without concrete implementations.
Key Components¶
LanguageModel Trait¶
Abstracts over different language model providers:
#[async_trait]
pub trait LanguageModel: Send + Sync {
/// Generate text completion
async fn generate(
&self,
prompt: &str,
config: &GenerationConfig,
) -> Result<String>;
/// Generate with structured output support
async fn generate_structured<T: DeserializeOwned>(
&self,
prompt: &str,
config: &GenerationConfig,
) -> Result<T>;
/// Check if model supports streaming
fn supports_streaming(&self) -> bool;
}
Implementation Responsibility: Users provide their own implementations integrating with OpenAI, Anthropic, local models, etc.
VectorStore Trait¶
Abstracts over different vector database backends:
#[async_trait]
pub trait VectorStore: Send + Sync {
/// Store documents with embeddings
async fn store(
&self,
documents: Vec<Document>,
) -> Result<Vec<DocumentId>>;
/// Retrieve similar documents
async fn search(
&self,
query: &str,
top_k: usize,
) -> Result<Vec<ScoredDocument>>;
/// Hybrid search (vector + keyword)
async fn hybrid_search(
&self,
query: &str,
top_k: usize,
alpha: f32,
) -> Result<Vec<ScoredDocument>>;
}
Implementation Responsibility: Users integrate with Qdrant, Pinecone, or custom vector stores.
Cache Trait¶
Provides caching abstraction for pipeline results:
#[async_trait]
pub trait Cache: Send + Sync {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>>;
async fn set(&self, key: &str, value: Vec<u8>, ttl: Duration) -> Result<()>;
async fn invalidate(&self, key: &str) -> Result<()>;
}
Design Principle¶
Trait abstractions only. No concrete implementations included. This allows: - Users to integrate with their existing infrastructure - No dependency bloat from unused provider clients - Maximum flexibility in implementation choices
Layer 2A: Core Execution (airsdsp/core)¶
Purpose¶
Implements the fundamental DSP framework: Stage-based pipeline composition with explicit control.
Key Components¶
Stage Trait Hierarchy¶
Base Stage Trait:
#[async_trait]
pub trait Stage: Send + Sync {
/// Stage name for debugging and tracing
fn name(&self) -> &str;
/// Execute stage logic
async fn execute(&self, ctx: &mut Context) -> Result<()>;
/// Get stage hooks (for cross-cutting concerns)
fn hooks(&self) -> &[Box<dyn StageHook>];
/// Error handling strategy
fn error_strategy(&self) -> ErrorStrategy {
ErrorStrategy::FailFast
}
}
Specialized Stage Traits:
/// Demonstrate: Bootstrap pipeline-aware demonstrations
#[async_trait]
pub trait DemonstrateStage: Stage {
async fn demonstrate(
&self,
ctx: &Context,
) -> Result<Vec<Demonstration>>;
}
/// Search: Strategic retrieval within pipeline flow
#[async_trait]
pub trait SearchStage: Stage {
async fn search(
&self,
ctx: &Context,
) -> Result<Vec<Document>>;
}
/// Predict: Grounded LM predictions
#[async_trait]
pub trait PredictStage: Stage {
async fn predict(
&self,
ctx: &Context,
) -> Result<Prediction>;
}
Why This Design?
- Type safety: Specialized traits provide compile-time guarantees
- Flexibility: Custom stages can implement just base Stage trait
- Uniform orchestration: Pipeline treats all stages uniformly via base trait
- Clear contracts: Each specialized trait has clear input/output types
Design Rationale: This trait hierarchy balances type safety for core DSP operations with flexibility for custom stages, enabling both compile-time guarantees and extensibility.
Pipeline & PipelineBuilder¶
Pipeline Structure:
pub struct Pipeline {
stages: Vec<Box<dyn Stage>>,
context: Context,
config: PipelineConfig,
}
impl Pipeline {
pub fn builder() -> PipelineBuilder {
PipelineBuilder::new()
}
pub async fn execute(&mut self, input: &str) -> Result<String> {
self.context.set_input(input);
for stage in &self.stages {
// Execute hooks: before()
self.execute_hooks_before(stage)?;
// Execute stage
stage.execute(&mut self.context).await?;
// Execute hooks: after() and transform()
self.execute_hooks_after(stage)?;
}
Ok(self.context.get_output())
}
}
Builder Pattern:
pub struct PipelineBuilder {
stages: Vec<Box<dyn Stage>>,
config: PipelineConfig,
}
impl PipelineBuilder {
pub fn add_stage(mut self, stage: Box<dyn Stage>) -> Self {
self.stages.push(stage);
self
}
pub fn demonstrate(mut self, stage: impl DemonstrateStage + 'static) -> Self {
self.stages.push(Box::new(stage));
self
}
pub fn search(mut self, stage: impl SearchStage + 'static) -> Self {
self.stages.push(Box::new(stage));
self
}
pub fn predict(mut self, stage: impl PredictStage + 'static) -> Self {
self.stages.push(Box::new(stage));
self
}
pub fn build(self) -> Result<Pipeline> {
// Validation: at least one stage required
// Validation: must have at least one Predict stage
Pipeline::new(self.stages, self.config)
}
}
Design Rationale: The builder pattern with validation ensures pipelines are constructed correctly, preventing runtime errors from invalid configurations. At least one Predict stage is required as it represents the final output generation step.
Context Management¶
Context Structure:
pub struct Context {
// Input/output
input: String,
output: Option<String>,
// Demonstrations
demonstrations: Vec<Demonstration>,
// Retrieved documents
documents: Vec<Document>,
// Intermediate predictions
predictions: Vec<Prediction>,
// Metadata
metadata: HashMap<String, Value>,
// Execution trace
trace: Vec<TraceEvent>,
}
Key Principle: Context flows through pipeline stages, carrying demonstrations, retrieved documents, and intermediate results.
Hook System¶
Hook Trait:
#[async_trait]
pub trait StageHook: Send + Sync {
fn name(&self) -> &str;
/// Execute before stage
async fn before(&self, ctx: &Context) -> Result<()> {
Ok(())
}
/// Execute after stage
async fn after(&self, ctx: &Context) -> Result<()> {
Ok(())
}
/// Transform context after stage
async fn transform(&self, ctx: &mut Context) -> Result<()> {
Ok(())
}
/// Error handling strategy
fn error_strategy(&self) -> HookErrorStrategy {
HookErrorStrategy::ContinueWarn
}
}
Hook Error Strategies:
- FailStage: Hook failure fails the entire stage
- ContinueWarn: Log warning, continue execution (default)
- ContinueSilent: Ignore hook errors silently
Common Hooks:
- LoggingHook: Log stage execution details
- MetricsHook: Record performance metrics
- CacheHook: Cache stage results
- ValidationHook: Validate stage outputs
Design Principle: Stage-owned hooks (not global) for clarity, isolation, and testability.
Layer 2B: Pattern Library (airsdsp/patterns)¶
Purpose¶
Provides high-level reusable DSP patterns built on core primitives.
Included Patterns¶
Chain-of-Thought (CoT)¶
Structured step-by-step reasoning:
pub struct CoTPattern {
// Configuration
}
impl CoTPattern {
pub fn build_pipeline(&self, lm: Arc<dyn LanguageModel>) -> Pipeline {
Pipeline::builder()
.predict(CoTPredict::new(lm))
.build()
.unwrap()
}
}
ReAct (Reason-Action)¶
Iterative thought-action-observation loop:
pub struct ReActPattern {
max_iterations: usize,
}
impl ReActPattern {
pub fn build_pipeline(
&self,
lm: Arc<dyn LanguageModel>,
tools: Vec<Box<dyn Tool>>,
) -> Pipeline {
Pipeline::builder()
.add_stage(Box::new(ReActStage::new(lm, tools, self.max_iterations)))
.build()
.unwrap()
}
}
Multi-Hop Reasoning¶
Entity extraction followed by iterative retrieval:
pub struct MultiHopPattern {
max_hops: usize,
}
impl MultiHopPattern {
pub fn build_pipeline(
&self,
lm: Arc<dyn LanguageModel>,
vs: Arc<dyn VectorStore>,
) -> Pipeline {
Pipeline::builder()
.predict(EntityExtract::new(lm.clone()))
.search(VectorSearch::new(vs.clone()))
.predict(AnswerGenerate::new(lm))
.build()
.unwrap()
}
}
Design Principle¶
Patterns are convenience constructors for common pipeline compositions. They don't hide complexity—users can always build equivalent pipelines manually using core primitives.
Layer 2C: Tooling¶
Evaluation (airsdsp/eval)¶
Purpose: Evaluate pipeline quality and performance.
Priority Metric: G-Eval
LLM-based evaluation framework:
pub struct GEval {
evaluator_lm: Arc<dyn LanguageModel>,
criteria: Vec<EvaluationCriterion>,
}
impl GEval {
pub async fn evaluate(
&self,
pipeline: &mut Pipeline,
test_cases: Vec<TestCase>,
) -> Result<EvaluationReport> {
// Execute pipeline on test cases
// Use evaluator LM to score outputs
// Aggregate results
}
}
Extension Points: Metric trait allows community-contributed metrics (BLEU, ROUGE, custom).
Debugging (airsdsp/debug)¶
Purpose: Debug and observe pipeline execution.
Key Features: - Execution Tracing: Capture inputs, outputs, timing for each stage - Observability Hooks: Integration with logging/metrics systems - Stage Inspection: Introspect pipeline structure and state
pub struct ExecutionTracer {
events: Vec<TraceEvent>,
}
impl ExecutionTracer {
pub fn trace_stage(&mut self, stage: &dyn Stage, ctx: &Context) {
self.events.push(TraceEvent {
stage_name: stage.name().to_string(),
timestamp: Instant::now(),
input: ctx.clone(),
// ...
});
}
pub fn export_json(&self) -> String {
serde_json::to_string(&self.events).unwrap()
}
}
Layer 3: Orchestration (airsdsp/orchestration)¶
Purpose¶
Manage multiple specialized pipelines with intelligent routing.
Key Components¶
Multi-Pipeline System¶
pub struct MultiPipeline {
pipelines: HashMap<TaskType, Pipeline>,
default_pipeline: Pipeline,
classifier: TaskClassifier,
router: Router,
}
impl MultiPipeline {
pub async fn execute(&mut self, input: &str) -> Result<String> {
// Classify task type
let task_type = self.classifier.classify(input).await?;
// Route to appropriate pipeline
let pipeline = self.router.route(task_type, &self.pipelines)
.unwrap_or(&mut self.default_pipeline);
// Execute
pipeline.execute(input).await
}
}
Task Classifier¶
Uses a DSP pipeline to classify input:
pub struct TaskClassifier {
classification_pipeline: Pipeline,
}
impl TaskClassifier {
pub async fn classify(&mut self, input: &str) -> Result<TaskType> {
let output = self.classification_pipeline.execute(input).await?;
TaskType::from_str(&output)
}
}
Router¶
Selects pipeline based on classification:
pub struct Router {
confidence_threshold: f32,
}
impl Router {
pub fn route<'a>(
&self,
classification: TaskType,
pipelines: &'a HashMap<TaskType, Pipeline>,
) -> Option<&'a mut Pipeline> {
pipelines.get_mut(&classification)
}
}
Design Principle: Minimal initial implementation. Task classification + routing + default fallback. Can evolve to support DAG-based intent decomposition in future.
Design Decisions¶
1. Explicit Over Automatic¶
Decision: Provide explicit APIs rather than automated optimization.
Rationale: - Production systems require predictable behavior - Debugging is easier with explicit control flow - Aligns with Rust's philosophy of zero-cost abstractions - Differentiates from DSPy's automated approach
Trade-offs: - ✅ Predictable, debuggable behavior - ✅ Fine-grained control over pipeline - ❌ Requires more manual configuration - ❌ No automated prompt optimization
2. Stage-Based Architecture¶
Decision: Use trait-based Stage abstraction with specialized traits for core DSP operations.
Rationale: - Type safety for Demonstrate, Search, Predict operations - Flexibility for custom stages - Uniform pipeline orchestration - Clear contracts and documentation
3. Modular Crate Organization¶
Decision: Organize as 6 independent crates in a workspace.
Rationale: - Clear separation of concerns - Independent evolution of components - Flexible dependencies (users depend only on what they need) - Phased implementation possible
4. Async-First Design¶
Decision: Use async/await throughout the codebase.
Rationale: - Model API calls are I/O-bound operations - Enable efficient concurrent pipeline execution - Natural fit for Rust's async ecosystem
Implementation:
#[async_trait]
pub trait Stage: Send + Sync {
async fn execute(&self, ctx: &mut Context) -> Result<()>;
}
5. Type-Safe Abstractions¶
Decision: Use strong typing and trait objects for model abstractions.
Rationale: - Compile-time guarantees for correctness - Enable zero-cost abstractions where possible - Clear API boundaries
Data Flow¶
Simple Pipeline Execution¶
User Input
↓
┌──────────────────────┐
│ Pipeline.execute() │
└──────────┬───────────┘
↓
┌──────────────┐
│ Context │ ← Initialized with input
└──────┬───────┘
↓
┌──────────────────────────────┐
│ Stage 1: Demonstrate │
│ - before() hooks │
│ - execute() │
│ - after() hooks │
│ - transform() hooks │
└──────────┬───────────────────┘
↓ (Context updated)
┌──────────────────────────────┐
│ Stage 2: Search │
│ - before() hooks │
│ - execute() │
│ - after() hooks │
│ - transform() hooks │
└──────────┬───────────────────┘
↓ (Context updated)
┌──────────────────────────────┐
│ Stage 3: Predict │
│ - before() hooks │
│ - execute() │
│ - after() hooks │
│ - transform() hooks │
└──────────┬───────────────────┘
↓
┌──────────────┐
│ Context │ ← Contains output
└──────┬───────┘
↓
Final Output
Multi-Hop Pipeline Execution¶
User Question
↓
┌────────────────────────────┐
│ Stage 1: Entity Extract │
│ (Predict) │
└──────────┬─────────────────┘
↓ entity
┌────────────────────────────┐
│ Stage 2: First Search │
│ (Search) │
└──────────┬─────────────────┘
↓ documents_1
┌────────────────────────────┐
│ Stage 3: Second Search │
│ (Search) │
└──────────┬─────────────────┘
↓ documents_2
┌────────────────────────────┐
│ Stage 4: Answer Generate │
│ (Predict) │
└──────────┬─────────────────┘
↓
Final Answer
Error Handling¶
AirsDSP uses a layered error handling approach:
#[derive(Debug, thiserror::Error)]
pub enum AirsDspError {
#[error("Pipeline error: {0}")]
Pipeline(#[from] PipelineError),
#[error("Stage error in '{stage}': {source}")]
Stage {
stage: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Hook error in '{hook}': {source}")]
Hook {
hook: String,
strategy: HookErrorStrategy,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Infrastructure error: {0}")]
Infrastructure(String),
#[error("Configuration error: {0}")]
Config(String),
}
Error Propagation:
- Errors bubble up from infrastructure layer through stages
- Context is added at each layer
- Hook errors handled based on HookErrorStrategy
- Applications receive rich error information
Performance Considerations¶
Zero-Cost Abstractions¶
AirsDSP uses trait objects with Arc for dynamic dispatch where necessary, but provides static dispatch paths for performance-critical operations:
// Dynamic dispatch for flexibility
pub struct Pipeline {
stages: Vec<Box<dyn Stage>>,
}
// Future: Static dispatch for performance
pub struct StaticPipeline<S1, S2, S3>
where
S1: DemonstrateStage,
S2: SearchStage,
S3: PredictStage,
{
demonstrate: S1,
search: S2,
predict: S3,
}
Async Runtime Efficiency¶
- Non-blocking I/O for model API calls
- Concurrent execution of independent operations
- Efficient resource utilization
Memory Management¶
Arcfor shared ownership of models and configurations- Streaming for large document retrieval (future)
- Lazy evaluation where possible
Extension Points¶
AirsDSP is designed for extensibility at multiple levels:
Custom Stages¶
Implement the Stage trait or specialized traits:
struct MyCustomStage {
// Custom fields
}
#[async_trait]
impl Stage for MyCustomStage {
fn name(&self) -> &str {
"my_custom_stage"
}
async fn execute(&self, ctx: &mut Context) -> Result<()> {
// Custom logic
Ok(())
}
fn hooks(&self) -> &[Box<dyn StageHook>] {
&[]
}
}
Custom Hooks¶
Implement the StageHook trait:
struct MyCustomHook;
#[async_trait]
impl StageHook for MyCustomHook {
fn name(&self) -> &str {
"my_custom_hook"
}
async fn before(&self, ctx: &Context) -> Result<()> {
// Pre-stage logic
Ok(())
}
}
Custom Patterns¶
Combine stages in novel ways:
pub fn custom_pattern(
lm: Arc<dyn LanguageModel>,
vs: Arc<dyn VectorStore>,
) -> Pipeline {
Pipeline::builder()
.search(/* custom search stage */)
.predict(/* custom predict stage */)
.search(/* second search */)
.predict(/* final predict */)
.build()
.unwrap()
}
Custom Metrics¶
Implement the Metric trait:
#[async_trait]
pub trait Metric: Send + Sync {
async fn evaluate(
&self,
prediction: &str,
reference: &str,
) -> Result<f32>;
}
Testing Strategy¶
Unit Tests¶
Test individual components in isolation:
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_stage_execution() {
let stage = MyStage::new();
let mut ctx = Context::new("test input");
let result = stage.execute(&mut ctx).await;
assert!(result.is_ok());
}
}
Integration Tests¶
Test pipeline execution with mock implementations:
#[tokio::test]
async fn test_pipeline_execution() {
let lm = Arc::new(MockLanguageModel::new());
let vs = Arc::new(MockVectorStore::new());
let mut pipeline = Pipeline::builder()
.search(VectorSearch::new(vs))
.predict(SimplePredict::new(lm))
.build()
.unwrap();
let result = pipeline.execute("test question").await;
assert!(result.is_ok());
}
Property-Based Tests¶
Use proptest for property-based testing:
proptest! {
#[test]
fn test_pipeline_determinism(input in ".*") {
let pipeline = create_deterministic_pipeline();
let result1 = pipeline.execute(&input);
let result2 = pipeline.execute(&input);
assert_eq!(result1, result2);
}
}
Implementation Status¶
Completed¶
- ✅ Architecture design and documentation
- ✅ ADR-001: No Automated Prompt Optimization
- ✅ ADR-002: Stage and Hook Architecture
- ✅ ADR-003: Compositional Pipeline Architecture
- ✅ Workspace ADR-001: Modular Crate Architecture
Phase 1: Foundation (Months 1-3) - Starting¶
- Create Rust workspace structure
- Implement
airsdsp/infratraits - Implement
airsdsp/corestage hierarchy - Implement Pipeline and PipelineBuilder
- Implement Context management
- Implement Hook system
- Write comprehensive tests
- Complete API documentation
Phase 2: Patterns & Orchestration (Months 4-6) - Planned¶
- Implement CoT, ReAct, Multi-hop patterns
- Implement multi-pipeline system
- Implement task classification
- Implement routing logic
Phase 3: Tooling (Months 7-9) - Planned¶
- Implement G-Eval
- Implement execution tracing
- Implement observability hooks
- Implement stage inspection
Related Documentation¶
Workspace Documentation: - Getting Started - Setup and first pipeline - Overview - High-level framework introduction - Roadmap - Development phases and timeline - Contributing - How to contribute
Research Documentation: - Research Index - Comprehensive DSP framework research
Document Status: ✅ Current
Last Updated: 2025-12-16
Next Review: Phase 1 completion (Month 3)