DSP Multi-Task System Architecture¶
Document Type: Knowledge Base - System Architecture Pattern
Created: 2025-10-20
Last Updated: 2025-10-20
Confidence Level: High
Source: DSP framework analysis and practical implementation reasoning
Purpose: Document the multi-task system architecture with task classification and pipeline routing
Overview¶
This document describes a critical architectural pattern for production DSP systems: multi-task architecture with task classification and pipeline routing. This pattern enables a single system to handle diverse user inputs by classifying tasks and routing to specialized pipelines, with graceful fallback to a default pipeline.
The Core Insight: Task-Level vs. Query-Level¶
A Critical Distinction¶
Pipelines are built for TASK TYPES, not specific user inputs.
This distinction is fundamental to understanding DSP architecture:
| Level | What It Represents | Examples | Built When |
|---|---|---|---|
| Task Level | Category of problems | Math, QA, Code Analysis | Design time (once) |
| Query Level | Specific user input | "What is 15% of 240?" | Runtime (many times) |
The Mental Model¶
βββββββββββββββββββββββββββββββββββββββββββ
β Task Type: "Math Word Problems" β β Built ONCE
β Pipeline: Decompose β Solve β Answer β
βββββββββββββββββββββββββββββββββββββββββββ
β Executes on β
βββββββββββββββββββββββββββββββββββββββββββ
β Query 1: "What is 15% of 240?" β β Many different
β Query 2: "sqrt(sum(primes < 10))?" β β user inputs,
β Query 3: "If John has 5 apples..." β β same task type
βββββββββββββββββββββββββββββββββββββββββββ
Why This Matters¶
Pipeline stages are GENERIC within a task type: - They can handle variation in inputs - They use demonstrations to adapt behavior - They work on any input of that task type - They don't need to be rebuilt for each query
The Multi-Task System Pattern¶
Architecture Overview¶
User Input
β
βββββββββββββββββββββββββββββββββ
β Task Classification β
β (DSP Pipeline) β
β Determines: What task type? β
βββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββ
β Router / Dispatcher β
β Selects: Which pipeline? β
βββββββββββββββββββββββββββββββββ
β
βββββββββββββββββ¬ββββββββββββββββ
β β β
βββββββββββ βββββββββββ ββββββββββββ
β Math β β QA β β Code β ... Specialized
βPipeline β βPipeline β β Pipeline β Pipelines
βββββββββββ βββββββββββ ββββββββββββ
β β β
βββββββββββββββββ΄ββββββββββββββββ
β
If no specialized pipeline
β
βββββββββββββββββββββββββββββββββ
β Default/Fallback Pipeline β
β Handles: Everything else β
βββββββββββββββββββββββββββββββββ
β
Response
Components¶
1. Task Classifier (DSP Pipeline)¶
Purpose: Determine what type of task the user input represents
Implementation: A DSP pipeline itself!
fn build_task_classifier() -> Result<Pipeline> {
Pipeline::new()
.demonstrate(vec![
Example {
input: "What is 15% of 240?",
output: "task_type: math, confidence: 0.95",
},
Example {
input: "Who wrote Romeo and Juliet?",
output: "task_type: question_answering, confidence: 0.90",
},
Example {
input: "What year did the director of Inception win his first Oscar?",
output: "task_type: multi_hop_reasoning, confidence: 0.92",
},
Example {
input: "Debug this Python code: def add(a b): return a + b",
output: "task_type: code_analysis, confidence: 0.88",
},
// More examples covering various task types
])
.predict(ClassifyTaskType::new())
.build()
}
Output: Task type + confidence score
2. Router / Dispatcher¶
Purpose: Select the appropriate pipeline based on classification
Logic:
fn select_pipeline(&self, classification: &TaskClassification) -> &Pipeline {
// Check confidence threshold
if classification.confidence < self.config.min_confidence {
return &self.default_pipeline;
}
// Try to get specialized pipeline
match self.get_specialized_pipeline(&classification.task_type) {
Some(pipeline) => pipeline,
None => &self.default_pipeline, // Fallback
}
}
3. Specialized Pipelines¶
Purpose: Optimized pipelines for specific task types
Examples: - Math Pipeline: Problem decomposition β Calculation β Answer - QA Pipeline: Query formulation β Search β Extract answer - Multi-hop Pipeline: Initial search β Entity extraction β Follow-up search β Synthesis - Code Pipeline: Code analysis β Search docs β Suggest fix
4. Default/Fallback Pipeline¶
Purpose: Handle any input that doesn't match a specialized pipeline
Characteristics: - General-purpose approach - Uses retrieval when possible - Attempts to provide reasonable response - Always available (never returns error due to unknown task)
Complete Implementation Example¶
System Structure¶
pub struct AirsDSPSystem {
// Core classification pipeline (REQUIRED)
task_classifier: Pipeline,
// Specialized task pipelines (OPTIONAL - can be None)
math_pipeline: Option<Pipeline>,
qa_pipeline: Option<Pipeline>,
multi_hop_qa_pipeline: Option<Pipeline>,
code_analysis_pipeline: Option<Pipeline>,
conversational_pipeline: Option<Pipeline>,
legal_analysis_pipeline: Option<Pipeline>,
// Fallback pipeline (REQUIRED)
default_pipeline: Pipeline,
// Configuration
config: SystemConfig,
}
pub struct SystemConfig {
// Whether to use specialized pipelines when available
enable_specialized_pipelines: bool,
// Minimum confidence to use specialized pipeline
min_confidence_threshold: f32,
// Whether to log routing decisions
log_routing: bool,
// Whether to track usage patterns
track_usage: bool,
}
pub struct TaskClassification {
pub task_type: String,
pub confidence: f32,
pub reasoning: Option<String>,
}
pub struct Response {
pub answer: String,
pub task_type: String,
pub pipeline_used: String,
pub confidence: f32,
pub metadata: ResponseMetadata,
}
System Implementation¶
impl AirsDSPSystem {
pub fn new() -> Result<Self> {
Ok(Self {
// Task classifier - ALWAYS REQUIRED
task_classifier: Self::build_task_classifier()?,
// Specialized pipelines - OPTIONAL
// Start with None, add as needed
math_pipeline: Some(Self::build_math_pipeline()?),
qa_pipeline: Some(Self::build_qa_pipeline()?),
multi_hop_qa_pipeline: Some(Self::build_multi_hop_pipeline()?),
code_analysis_pipeline: None, // Not implemented yet
conversational_pipeline: None, // Not implemented yet
legal_analysis_pipeline: None, // Not implemented yet
// Default pipeline - ALWAYS REQUIRED
default_pipeline: Self::build_default_pipeline()?,
// Configuration
config: SystemConfig {
enable_specialized_pipelines: true,
min_confidence_threshold: 0.75,
log_routing: true,
track_usage: true,
},
})
}
/// Main entry point: process user input
pub fn process(&self, user_input: &str) -> Result<Response> {
// Step 1: Classify the task type
let classification = self.classify_task(user_input)?;
if self.config.log_routing {
log::info!(
"Classified as '{}' with confidence {:.2}",
classification.task_type,
classification.confidence
);
}
// Step 2: Select appropriate pipeline
let selected_pipeline = self.select_pipeline(&classification)?;
if self.config.log_routing {
log::info!("Routing to pipeline: {}", selected_pipeline.name());
}
// Step 3: Execute the selected pipeline
let answer = selected_pipeline.execute(user_input)?;
// Step 4: Build response
Ok(Response {
answer,
task_type: classification.task_type,
pipeline_used: selected_pipeline.name().to_string(),
confidence: classification.confidence,
metadata: ResponseMetadata::new(),
})
}
/// Classify the input to determine task type
fn classify_task(&self, input: &str) -> Result<TaskClassification> {
let result = self.task_classifier.execute(input)?;
// Parse classification output
// Expected format: "task_type: math, confidence: 0.95, reasoning: ..."
let task_type = extract_field(&result, "task_type")?;
let confidence = extract_field(&result, "confidence")?
.parse::<f32>()?;
let reasoning = extract_field(&result, "reasoning").ok();
Ok(TaskClassification {
task_type,
confidence,
reasoning,
})
}
/// Select the appropriate pipeline based on classification
fn select_pipeline(&self, classification: &TaskClassification) -> Result<&Pipeline> {
// If specialized pipelines are disabled, use default
if !self.config.enable_specialized_pipelines {
return Ok(&self.default_pipeline);
}
// If confidence is too low, use default
if classification.confidence < self.config.min_confidence_threshold {
if self.config.log_routing {
log::warn!(
"Confidence {:.2} below threshold {:.2}, using default pipeline",
classification.confidence,
self.config.min_confidence_threshold
);
}
return Ok(&self.default_pipeline);
}
// Try to get specialized pipeline
let specialized = match classification.task_type.as_str() {
"math" | "calculation" | "arithmetic" => {
self.math_pipeline.as_ref()
}
"question_answering" | "factual_query" => {
self.qa_pipeline.as_ref()
}
"multi_hop_reasoning" | "complex_question" => {
self.multi_hop_qa_pipeline.as_ref()
}
"code_analysis" | "programming" | "debugging" => {
self.code_analysis_pipeline.as_ref()
}
"conversation" | "chat" => {
self.conversational_pipeline.as_ref()
}
"legal_analysis" | "contract_review" => {
self.legal_analysis_pipeline.as_ref()
}
_ => None,
};
// Return specialized pipeline or fall back to default
Ok(specialized.unwrap_or(&self.default_pipeline))
}
// Pipeline builders
fn build_task_classifier() -> Result<Pipeline> {
Pipeline::new()
.demonstrate(load_task_classification_examples()?)
.predict(ClassifyTaskType::new())
.build()
}
fn build_math_pipeline() -> Result<Pipeline> {
Pipeline::new()
.demonstrate(load_math_examples()?)
.predict(IdentifyProblemType::new())
.predict(DecomposeProblem::new())
.predict(SolveStepByStep::new())
.predict(SynthesizeAnswer::new())
.build()
}
fn build_qa_pipeline() -> Result<Pipeline> {
Pipeline::new()
.demonstrate(load_qa_examples()?)
.search(FormulateQuery::new())
.predict(ExtractAnswer::new())
.build()
}
fn build_multi_hop_pipeline() -> Result<Pipeline> {
Pipeline::new()
.demonstrate(load_multi_hop_examples()?)
.search(InitialQuery::new())
.predict(ExtractEntities::new())
.search(FollowUpQuery::new())
.predict(SynthesizeAnswer::new())
.build()
}
fn build_default_pipeline() -> Result<Pipeline> {
// General-purpose pipeline
Pipeline::new()
.demonstrate(load_general_examples()?)
.search(GeneralQuery::new())
.predict(GeneralResponse::new())
.build()
}
}
Usage Examples¶
fn main() -> Result<()> {
let system = AirsDSPSystem::new()?;
// Example 1: Math problem
// Classified as "math" β Routes to math_pipeline
let response1 = system.process("What is 15% of 240?")?;
println!("Answer: {}", response1.answer);
println!("Pipeline used: {}", response1.pipeline_used);
// Output: "36", "math_pipeline"
// Example 2: Simple question
// Classified as "question_answering" β Routes to qa_pipeline
let response2 = system.process("Who wrote Hamlet?")?;
println!("Answer: {}", response2.answer);
println!("Pipeline used: {}", response2.pipeline_used);
// Output: "William Shakespeare", "qa_pipeline"
// Example 3: Multi-hop question
// Classified as "multi_hop_reasoning" β Routes to multi_hop_qa_pipeline
let response3 = system.process(
"What year did the director of Inception win his first Oscar?"
)?;
println!("Answer: {}", response3.answer);
println!("Pipeline used: {}", response3.pipeline_used);
// Output: "2024", "multi_hop_qa_pipeline"
// Example 4: Code question
// Classified as "code_analysis" but pipeline not implemented
// β Falls back to default_pipeline
let response4 = system.process(
"Fix this Rust code: fn add(a: i32 b: i32) -> i32 { a + b }"
)?;
println!("Answer: {}", response4.answer);
println!("Pipeline used: {}", response4.pipeline_used);
// Output: [best effort answer], "default_pipeline"
// Example 5: Creative request
// Classified with low confidence or unknown type
// β Routes to default_pipeline
let response5 = system.process("Write a haiku about AI")?;
println!("Answer: {}", response5.answer);
println!("Pipeline used: {}", response5.pipeline_used);
// Output: [haiku], "default_pipeline"
Ok(())
}
Incremental Development Strategy¶
Phase 1: Minimal Viable System¶
// Start with classifier + default only
let system = AirsDSPSystem {
task_classifier: build_task_classifier()?,
default_pipeline: build_default_pipeline()?,
// All specialized pipelines: None
math_pipeline: None,
qa_pipeline: None,
// ... etc
};
Capabilities: System works, classifies tasks, but uses default for everything
Advantage: Can deploy and get feedback before building specialized pipelines
Phase 2: Add First Specialized Pipeline¶
let system = AirsDSPSystem {
task_classifier: build_task_classifier()?,
math_pipeline: Some(build_math_pipeline()?), // NEW!
default_pipeline: build_default_pipeline()?,
// Other pipelines still None
qa_pipeline: None,
// ... etc
};
Capabilities: Math queries get optimized handling, everything else uses default
Advantage: Can focus on one task type at a time, measure improvement
Phase 3: Add More Specialized Pipelines¶
let system = AirsDSPSystem {
task_classifier: build_task_classifier()?,
math_pipeline: Some(build_math_pipeline()?),
qa_pipeline: Some(build_qa_pipeline()?), // NEW!
multi_hop_qa_pipeline: Some(build_multi_hop_pipeline()?), // NEW!
default_pipeline: build_default_pipeline()?,
// Still some None
code_analysis_pipeline: None,
// ... etc
};
Capabilities: Multiple task types optimized, growing coverage
Advantage: Incremental improvement, measure impact of each addition
Phase 4: Mature System¶
let system = AirsDSPSystem {
task_classifier: build_task_classifier()?,
// Most/all specialized pipelines implemented
math_pipeline: Some(build_math_pipeline()?),
qa_pipeline: Some(build_qa_pipeline()?),
multi_hop_qa_pipeline: Some(build_multi_hop_pipeline()?),
code_analysis_pipeline: Some(build_code_pipeline()?), // NOW!
conversational_pipeline: Some(build_conversational_pipeline()?), // NOW!
legal_analysis_pipeline: Some(build_legal_pipeline()?), // NOW!
default_pipeline: build_default_pipeline()?,
};
Capabilities: Comprehensive coverage, default handles only edge cases
Advantage: Optimized for common tasks, graceful for rare tasks
Advanced Patterns¶
Pattern 1: Confidence-Based Routing¶
fn select_pipeline_smart(&self, classification: &TaskClassification) -> &Pipeline {
match classification.confidence {
c if c >= 0.90 => {
// High confidence: definitely use specialized
self.get_specialized(&classification.task_type)
.unwrap_or(&self.default_pipeline)
}
c if c >= 0.75 => {
// Medium confidence: use specialized with logging
log::info!("Medium confidence: {:.2}", c);
self.get_specialized(&classification.task_type)
.unwrap_or(&self.default_pipeline)
}
c if c >= 0.60 => {
// Low confidence: prefer default
log::warn!("Low confidence: {:.2}, using default", c);
&self.default_pipeline
}
_ => {
// Very low confidence: definitely default
&self.default_pipeline
}
}
}
Pattern 2: Fallback Chain¶
fn select_with_fallback(&self, task_type: &str) -> &Pipeline {
// Try exact match
if let Some(p) = self.get_exact_pipeline(task_type) {
return p;
}
// Try category match (e.g., "math_algebra" β "math_general")
if let Some(p) = self.get_category_pipeline(task_type) {
return p;
}
// Ultimate fallback
&self.default_pipeline
}
Pattern 3: Ensemble Execution¶
pub fn process_with_ensemble(&self, input: &str) -> Result<Response> {
let classification = self.classify_task(input)?;
// If confidence is ambiguous, try multiple pipelines
if classification.confidence < 0.80 {
let results = vec![
self.specialized_pipeline.execute(input)?,
self.default_pipeline.execute(input)?,
];
// Select best result (could use another pipeline for this!)
return self.select_best_result(results);
}
// High confidence: single pipeline
let pipeline = self.select_pipeline(&classification)?;
Ok(pipeline.execute(input)?)
}
Pattern 4: Usage Tracking and Learning¶
pub struct UsageTracker {
routing_decisions: Vec<RoutingDecision>,
}
pub struct RoutingDecision {
input: String,
classified_as: String,
routed_to: String,
confidence: f32,
user_feedback: Option<Feedback>,
}
impl AirsDSPSystem {
pub fn process_with_tracking(&mut self, input: &str) -> Result<Response> {
let classification = self.classify_task(input)?;
let pipeline = self.select_pipeline(&classification)?;
let result = pipeline.execute(input)?;
// Track this decision
self.usage_tracker.record(RoutingDecision {
input: input.to_string(),
classified_as: classification.task_type.clone(),
routed_to: pipeline.name().to_string(),
confidence: classification.confidence,
user_feedback: None,
});
Ok(result)
}
pub fn analyze_routing_patterns(&self) -> RoutingInsights {
// Analyze which classifications work well
// Identify misclassifications
// Suggest new specialized pipelines for common patterns
self.usage_tracker.analyze()
}
}
Key Architectural Benefits¶
1. Flexibility¶
β
Handles diverse inputs gracefully
β
Adapts to task type automatically
β
No manual routing logic in application code
2. Incremental Development¶
β
Start minimal, add specialized pipelines over time
β
Each pipeline can be developed independently
β
Measure impact of each specialized pipeline
3. Robustness¶
β
Always has a fallback (default pipeline)
β
Never fails due to unknown task type
β
Graceful degradation for edge cases
4. Performance Optimization¶
β
Specialized pipelines optimize common tasks
β
Default handles rare cases adequately
β
Can measure and optimize routing decisions
5. Maintainability¶
β
Clear separation of concerns
β
Each pipeline has single responsibility
β
Easy to test and debug individual pipelines
6. Extensibility¶
β
Easy to add new task types
β
Can add new specialized pipelines without changing core system
β
Configuration-driven behavior
Design Considerations¶
Task Classification Accuracy¶
Critical: Task classifier must be accurate
Strategies: - Use diverse demonstrations covering many task types - Include confidence scoring - Set appropriate confidence thresholds - Log and monitor misclassifications - Collect feedback to improve classifier
Default Pipeline Quality¶
Critical: Default pipeline must handle anything reasonably
Strategies: - Use general-purpose demonstrations - Include retrieval capability - Focus on robustness over optimization - Handle unexpected inputs gracefully
Pipeline Specialization Granularity¶
Question: How specialized should pipelines be?
Guidelines: - Too general: Loses optimization benefit - Too specific: Too many pipelines to maintain - Sweet spot: Common task categories that benefit from optimization
Examples: - β Good: "math", "qa", "multi_hop", "code", "legal" - β Too specific: "addition", "subtraction", "multiplication" - β Too general: "text_processing"
Confidence Threshold Tuning¶
Question: What confidence threshold for specialized pipelines?
Guidelines: - Too high (e.g., 0.95): Rarely use specialized pipelines - Too low (e.g., 0.50): Risk using wrong pipeline - Recommended: 0.70 - 0.80 range - Monitor and adjust based on performance
Performance Expectations¶
Classification Overhead¶
Cost: One additional pipeline execution (task classifier)
Benefit: Ability to route to optimal pipeline
Net Impact: Positive if specialized pipelines significantly outperform default
Expected Improvements¶
Based on DSP research benchmarks: - Specialized math pipeline: 20-40% improvement over default - Specialized multi-hop pipeline: 8-39% improvement over simple retrieval - Overall system: Depends on task distribution
Optimization Opportunities¶
- Cache classifications for repeated similar inputs
- Parallelize classification and default pipeline execution
- Profile which task types are most common
- Prioritize specialized pipeline development for common tasks
Testing Strategy¶
Unit Testing¶
#[cfg(test)]
mod tests {
#[test]
fn test_task_classification() {
let system = AirsDSPSystem::new().unwrap();
let classification = system.classify_task("What is 15% of 240?").unwrap();
assert_eq!(classification.task_type, "math");
assert!(classification.confidence > 0.8);
}
#[test]
fn test_pipeline_routing() {
let system = AirsDSPSystem::new().unwrap();
let response = system.process("What is 15% of 240?").unwrap();
assert_eq!(response.pipeline_used, "math_pipeline");
}
#[test]
fn test_fallback_to_default() {
let system = AirsDSPSystem::new().unwrap();
// Weird input that doesn't match any specialized pipeline
let response = system.process("asdfjkl;").unwrap();
assert_eq!(response.pipeline_used, "default_pipeline");
}
}
Integration Testing¶
#[test]
fn test_math_pipeline_accuracy() {
let system = AirsDSPSystem::new().unwrap();
let test_cases = vec![
("What is 15% of 240?", "36"),
("What is the square root of 144?", "12"),
// More test cases
];
for (question, expected) in test_cases {
let response = system.process(question).unwrap();
assert_eq!(response.answer, expected);
assert_eq!(response.pipeline_used, "math_pipeline");
}
}
Key Takeaways¶
For System Architecture¶
- β Task classifier is a DSP pipeline - uses DSP to classify DSP tasks
- β Specialized pipelines are optional - system works without them
- β Default pipeline is mandatory - ensures graceful handling
- β Routing is confidence-based - can tune for accuracy vs coverage
- β System is incrementally buildable - start simple, add complexity
For Implementation¶
- β Build task classifier first - foundation for all routing
- β Build default pipeline second - ensures system always works
- β Add specialized pipelines incrementally - measure each addition
- β Log routing decisions - understand system behavior
- β Track usage patterns - guide future development
For Production¶
- β Monitor classification accuracy - key to system performance
- β Measure specialized vs default usage - optimize for common cases
- β Collect user feedback - improve classifier and pipelines
- β Profile task distribution - prioritize pipeline development
- β Maintain default pipeline quality - safety net for edge cases
References¶
Related Knowledge Base Documents¶
- DSP Framework Core:
dsp_framework_core.md - DSP Pipeline Architecture Examples:
dsp_pipeline_architecture_examples.md - DSP Reasoning Strategies:
dsp_reasoning_strategies_implementation.md - DSP/DSPy Comparative Evolution:
dsp_dspy_comparative_evolution.md
Implementation Patterns¶
- Task-level vs Query-level distinction
- Incremental system development
- Graceful degradation through fallback
- Confidence-based routing
Document Status: Complete
Implementation Readiness: High - Production-ready architectural pattern
Next Steps: Use this pattern as foundation for AirsDSP system architecture design