Skip to content

Custom Transports

Building custom transport layers and implementations

Leverage the AIRS MCP transport abstraction layer to build custom transport implementations that suit your specific deployment requirements. Whether you need HTTP, WebSocket, TCP, or entirely custom protocols, this guide covers everything from basic implementation to production deployment.

Transport Architecture Overview

The AIRS MCP transport system is built around a flexible trait-based architecture:

graph TD
    A[Transport Trait] --> B[STDIO Transport]
    A --> C[HTTP Transport*]
    A --> D[WebSocket Transport*]
    A --> E[TCP Transport*]
    A --> F[Custom Transport]

    G[ZeroCopyTransport] --> B
    G --> F

    H[StreamingTransport] --> B
    H --> F

    I[BufferManager] --> B
    I --> F

    subgraph "Production Ready"
        B
    end

    subgraph "Framework Ready"
        C
        D
        E
    end

* Framework placeholders ready for implementation

Core Transport Trait

All transport implementations must satisfy the Transport trait:

use std::future::Future;
use airsprotocols_mcp::transport::{Transport, TransportError};

pub trait Transport: Send + Sync {
    /// Transport-specific error type
    type Error: std::error::Error + Send + Sync + 'static;

    /// Send a message through the transport
    fn send(&mut self, message: &[u8]) -> impl Future<Output = Result<(), Self::Error>> + Send;

    /// Receive a message from the transport
    fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send;

    /// Close the transport and clean up resources
    fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send;
}

Design Principles

  • Async-native: All operations return futures for Tokio integration
  • Generic messages: Uses raw bytes (&[u8]/Vec<u8>) for flexibility
  • Error transparency: Associated Error type for transport-specific handling
  • Resource safety: Explicit close() method ensures proper cleanup
  • Thread safety: All implementations must be Send + Sync

STDIO Transport Deep Dive

The production-ready STDIO transport serves as both the primary MCP transport and a reference implementation for custom transports.

Message Framing

STDIO transport uses newline-delimited JSON for message framing:

use airsprotocols_mcp::transport::{Transport, StdioTransport};

async fn stdio_example() -> Result<(), Box<dyn std::error::Error>> {
    let mut transport = StdioTransportClientBuilder::new().await?;

    // Send JSON-RPC message (automatically adds newline)
    let request = br#"{"jsonrpc":"2.0","method":"ping","id":"1"}"#;
    transport.send(request).await?;

    // Receive response (newline automatically stripped)
    let response = transport.receive().await?;
    println!("Received: {}", String::from_utf8_lossy(&response));

    transport.close().await?;
    Ok(())
}

Buffer Management

STDIO transport supports buffer pooling:

use airsprotocols_mcp::transport::{StdioTransport, BufferConfig};

async fn high_performance_stdio() -> Result<(), Box<dyn std::error::Error>> {
    // Configure buffer management
    let buffer_config = BufferConfig {
        read_buffer_capacity: 64 * 1024,     // 64KB read buffers
        write_buffer_capacity: 64 * 1024,    // 64KB write buffers
        max_message_size: 16 * 1024 * 1024,  // 16MB message limit
        pool_size: 10,                       // Pool 10 buffers
        acquire_timeout_ms: 100,             // 100ms timeout
        backpressure_threshold: 0.8,         // Apply backpressure at 80% capacity
    };

    let transport = StdioTransport::with_buffer_config(buffer_config).await?;

    // Monitor buffer performance
    if let Some(metrics) = transport.buffer_metrics() {
        println!("Buffer efficiency: {:.2}%", 
                 metrics.acquisition_success_rate() * 100.0);
    }

    Ok(())
}

Buffer Management

Implement ZeroCopyTransport for maximum performance:

use airsprotocols_mcp::transport::{ZeroCopyTransport, TransportError};
use bytes::BytesMut;

async fn zero_copy_example<T: ZeroCopyTransport>(
    transport: &mut T
) -> Result<(), TransportError> {
    // Acquire buffer from pool
    let mut buffer = transport.acquire_buffer().await?;

    // Receive directly into buffer (no allocation)
    let bytes_received = transport.receive_into_buffer(&mut buffer).await?;

    // Process message in-place
    process_message_inplace(&mut buffer[..bytes_received]);

    // Send using efficient buffer operations
    transport.send_bytes(&buffer[..bytes_received]).await?;

    // Buffer automatically returned to pool when dropped
    Ok(())
}

fn process_message_inplace(buffer: &mut [u8]) {
    // Process message efficiently in-place
    // e.g., modify headers, add routing info, etc.
}

Building Custom Transports

Basic Custom Transport Template

Here's a template for implementing custom transports:

use std::sync::Arc;
use tokio::sync::Mutex;
use async_trait::async_trait;
use airsprotocols_mcp::transport::{Transport, TransportError};

/// Custom transport implementation
pub struct CustomTransport {
    // Your transport-specific state
    connection: Arc<Mutex<Option<Connection>>>,
    config: CustomConfig,
    closed: Arc<Mutex<bool>>,
}

#[derive(Debug)]
pub struct CustomConfig {
    pub endpoint: String,
    pub timeout_ms: u64,
    pub max_message_size: usize,
}

#[derive(Debug)]
struct Connection {
    // Your connection implementation
}

impl CustomTransport {
    pub async fn new(config: CustomConfig) -> Result<Self, TransportError> {
        Ok(Self {
            connection: Arc::new(Mutex::new(None)),
            config,
            closed: Arc::new(Mutex::new(false)),
        })
    }

    async fn ensure_connected(&self) -> Result<(), TransportError> {
        let mut conn = self.connection.lock().await;
        if conn.is_none() {
            *conn = Some(self.establish_connection().await?);
        }
        Ok(())
    }

    async fn establish_connection(&self) -> Result<Connection, TransportError> {
        // Implement your connection logic
        todo!("Implement connection establishment")
    }
}

impl Transport for CustomTransport {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        // Check if closed
        if *self.closed.lock().await {
            return Err(TransportError::Closed);
        }

        // Validate message size
        if message.len() > self.config.max_message_size {
            return Err(TransportError::buffer_overflow(format!(
                "Message size {} exceeds limit {}",
                message.len(),
                self.config.max_message_size
            )));
        }

        // Ensure connection
        self.ensure_connected().await?;

        // Implement your send logic
        let connection = self.connection.lock().await;
        if let Some(conn) = connection.as_ref() {
            // Send message through your protocol
            self.send_through_connection(conn, message).await
        } else {
            Err(TransportError::connection_closed())
        }
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        // Check if closed
        if *self.closed.lock().await {
            return Err(TransportError::Closed);
        }

        // Ensure connection
        self.ensure_connected().await?;

        // Implement your receive logic
        let connection = self.connection.lock().await;
        if let Some(conn) = connection.as_ref() {
            self.receive_from_connection(conn).await
        } else {
            Err(TransportError::connection_closed())
        }
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        // Set closed flag
        *self.closed.lock().await = true;

        // Close connection
        let mut connection = self.connection.lock().await;
        if let Some(conn) = connection.take() {
            self.close_connection(conn).await?;
        }

        Ok(())
    }
}

impl CustomTransport {
    async fn send_through_connection(
        &self,
        connection: &Connection,
        message: &[u8]
    ) -> Result<(), TransportError> {
        // Implement protocol-specific sending
        todo!("Implement protocol send")
    }

    async fn receive_from_connection(
        &self,
        connection: &Connection
    ) -> Result<Vec<u8>, TransportError> {
        // Implement protocol-specific receiving
        todo!("Implement protocol receive")
    }

    async fn close_connection(&self, connection: Connection) -> Result<(), TransportError> {
        // Implement connection cleanup
        todo!("Implement connection cleanup")
    }
}

HTTP Transport Implementation

Here's a production-ready HTTP transport example:

use std::sync::Arc;
use tokio::sync::RwLock;
use reqwest::{Client, Url};
use serde_json::Value;
use airsprotocols_mcp::transport::{Transport, TransportError};

pub struct HttpTransport {
    client: Client,
    endpoint: Url,
    session_id: Arc<RwLock<Option<String>>>,
    last_event_id: Arc<RwLock<Option<String>>>,
    timeout: std::time::Duration,
}

impl HttpTransport {
    pub fn new(endpoint: Url, timeout_ms: u64) -> Result<Self, TransportError> {
        let client = Client::builder()
            .timeout(std::time::Duration::from_millis(timeout_ms))
            .build()
            .map_err(|e| TransportError::format(format!("HTTP client error: {}", e)))?;

        Ok(Self {
            client,
            endpoint,
            session_id: Arc::new(RwLock::new(None)),
            last_event_id: Arc::new(RwLock::new(None)),
            timeout: std::time::Duration::from_millis(timeout_ms),
        })
    }
}

impl Transport for HttpTransport {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        let json: Value = serde_json::from_slice(message)
            .map_err(|e| TransportError::format(format!("Invalid JSON: {}", e)))?;

        let response = self.client
            .post(self.endpoint.clone())
            .json(&json)
            .header("Content-Type", "application/json")
            .send()
            .await
            .map_err(|e| TransportError::io(e))?;

        if !response.status().is_success() {
            return Err(TransportError::format(format!(
                "HTTP error: {}", response.status()
            )));
        }

        Ok(())
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        // For HTTP, you might implement Server-Sent Events or polling
        // This is a simplified example
        let response = self.client
            .get(self.endpoint.clone())
            .send()
            .await
            .map_err(|e| TransportError::io(e))?;

        let bytes = response.bytes().await
            .map_err(|e| TransportError::io(e))?;

        Ok(bytes.to_vec())
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        // HTTP is stateless, so just clear session state
        *self.session_id.write().await = None;
        *self.last_event_id.write().await = None;
        Ok(())
    }
}

WebSocket Transport Implementation

For real-time bidirectional communication:

use tokio_tungstenite::{connect_async, WebSocketStream, MaybeTlsStream};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use std::sync::Arc;
use tokio::sync::Mutex;
use airsprotocols_mcp::transport::{Transport, TransportError};

pub struct WebSocketTransport {
    ws_stream: Arc<Mutex<Option<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
    url: String,
    closed: Arc<Mutex<bool>>,
}

impl WebSocketTransport {
    pub async fn new(url: String) -> Result<Self, TransportError> {
        let transport = Self {
            ws_stream: Arc::new(Mutex::new(None)),
            url,
            closed: Arc::new(Mutex::new(false)),
        };

        transport.connect().await?;
        Ok(transport)
    }

    async fn connect(&self) -> Result<(), TransportError> {
        let (ws_stream, _) = connect_async(&self.url).await
            .map_err(|e| TransportError::io(e))?;

        *self.ws_stream.lock().await = Some(ws_stream);
        Ok(())
    }
}

impl Transport for WebSocketTransport {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        if *self.closed.lock().await {
            return Err(TransportError::Closed);
        }

        let mut stream_guard = self.ws_stream.lock().await;
        if let Some(stream) = stream_guard.as_mut() {
            use futures_util::SinkExt;

            let text = String::from_utf8(message.to_vec())
                .map_err(|e| TransportError::format(format!("Invalid UTF-8: {}", e)))?;

            stream.send(Message::Text(text)).await
                .map_err(|e| TransportError::io(e))?;
        } else {
            return Err(TransportError::connection_closed());
        }

        Ok(())
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        if *self.closed.lock().await {
            return Err(TransportError::Closed);
        }

        let mut stream_guard = self.ws_stream.lock().await;
        if let Some(stream) = stream_guard.as_mut() {
            use futures_util::StreamExt;

            if let Some(msg) = stream.next().await {
                let msg = msg.map_err(|e| TransportError::io(e))?;

                match msg {
                    Message::Text(text) => Ok(text.into_bytes()),
                    Message::Binary(data) => Ok(data),
                    Message::Close(_) => {
                        *self.closed.lock().await = true;
                        Err(TransportError::Closed)
                    }
                    _ => Err(TransportError::format("Unexpected message type".to_string())),
                }
            } else {
                Err(TransportError::Closed)
            }
        } else {
            Err(TransportError::connection_closed())
        }
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        *self.closed.lock().await = true;

        let mut stream_guard = self.ws_stream.lock().await;
        if let Some(mut stream) = stream_guard.take() {
            use futures_util::SinkExt;
            let _ = stream.close(None).await; // Best effort close
        }

        Ok(())
    }
}

Protocol Considerations

Message Framing

Different transport protocols require different message framing strategies:

Transport Framing Strategy Example
STDIO Newline-delimited {"jsonrpc":"2.0","method":"ping"}\n
HTTP Content-Length header Content-Length: 35\r\n\r\n{"jsonrpc":"2.0","method":"ping"}
WebSocket Native framing WebSocket text/binary frames
TCP Length-prefixed [4-byte length][JSON payload]

Error Handling Strategies

Implement robust error handling for network failures:

use airsprotocols_mcp::transport::TransportError;

#[derive(Debug, thiserror::Error)]
pub enum CustomTransportError {
    #[error("Connection timeout")]
    Timeout,

    #[error("Protocol violation: {0}")]
    ProtocolViolation(String),

    #[error("Authentication failed")]
    AuthenticationFailed,

    #[error("Rate limit exceeded")]
    RateLimitExceeded,

    #[error("Network error: {0}")]
    Network(#[from] std::io::Error),
}

impl From<CustomTransportError> for TransportError {
    fn from(err: CustomTransportError) -> Self {
        match err {
            CustomTransportError::Timeout => TransportError::timeout(),
            CustomTransportError::Network(io_err) => TransportError::io(io_err),
            other => TransportError::format(other.to_string()),
        }
    }
}

Security Implementation

Implement security features for production deployments:

use std::sync::Arc;
use tokio_rustls::{TlsConnector, rustls::ClientConfig};

pub struct SecureTransport<T> {
    inner: T,
    tls_config: Arc<ClientConfig>,
    certificate_validation: bool,
}

impl<T: Transport> SecureTransport<T> {
    pub fn new(inner: T, tls_config: ClientConfig) -> Self {
        Self {
            inner,
            tls_config: Arc::new(tls_config),
            certificate_validation: true,
        }
    }

    pub fn disable_certificate_validation(mut self) -> Self {
        self.certificate_validation = false;
        self
    }
}

impl<T: Transport> Transport for SecureTransport<T> {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        // Add authentication headers or encryption
        let encrypted_message = self.encrypt_message(message)?;
        self.inner.send(&encrypted_message).await
            .map_err(|e| TransportError::format(e.to_string()))
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        let encrypted_message = self.inner.receive().await
            .map_err(|e| TransportError::format(e.to_string()))?;
        self.decrypt_message(&encrypted_message)
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        self.inner.close().await
            .map_err(|e| TransportError::format(e.to_string()))
    }
}

impl<T> SecureTransport<T> {
    fn encrypt_message(&self, message: &[u8]) -> Result<Vec<u8>, TransportError> {
        // Implement encryption
        Ok(message.to_vec()) // Placeholder
    }

    fn decrypt_message(&self, message: &[u8]) -> Result<Vec<u8>, TransportError> {
        // Implement decryption
        Ok(message.to_vec()) // Placeholder
    }
}

Performance Optimization

Buffer Management Integration

Integrate with AIRS MCP's buffer management:

use airsprotocols_mcp::transport::buffer::{BufferManager, BufferConfig};
use std::sync::Arc;

pub struct BufferedTransport<T> {
    inner: T,
    buffer_manager: Arc<BufferManager>,
}

impl<T: Transport> BufferedTransport<T> {
    pub fn new(inner: T, buffer_config: BufferConfig) -> Self {
        Self {
            inner,
            buffer_manager: Arc::new(BufferManager::new(buffer_config)),
        }
    }
}

impl<T: Transport> Transport for BufferedTransport<T> {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        // Use buffer pool for send operations
        let mut buffer = self.buffer_manager.acquire_write_buffer().await?;
        buffer.extend_from_slice(message);

        self.inner.send(&buffer).await
            .map_err(|e| TransportError::format(e.to_string()))
        // Buffer automatically returned to pool when dropped
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        let message = self.inner.receive().await
            .map_err(|e| TransportError::format(e.to_string()))?;

        // Process with buffer pool to reduce allocations
        let mut buffer = self.buffer_manager.acquire_read_buffer().await?;
        buffer.clear();
        buffer.extend_from_slice(&message);

        Ok(buffer.to_vec())
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        self.inner.close().await
            .map_err(|e| TransportError::format(e.to_string()))
    }
}

Streaming Integration

Leverage streaming capabilities for large messages:

use airsprotocols_mcp::transport::streaming::{StreamingTransport, StreamingStats};
use airsprotocols_mcp::protocol::jsonrpc::streaming::StreamingConfig;

async fn create_streaming_transport<T>(
    base_transport: T,
    max_message_size: usize
) -> StreamingTransport<T>
where
    T: Transport + airsprotocols_mcp::transport::ZeroCopyTransport + Send + Sync,
{
    let config = StreamingConfig {
        max_message_size,
        read_buffer_size: 256 * 1024,  // 256KB streaming buffer
        strict_validation: true,
    };

    StreamingTransport::new(base_transport, config)
}

Testing Custom Transports

Mock Transport for Testing

Create mock transports for comprehensive testing:

use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
use airsprotocols_mcp::transport::{Transport, TransportError};

pub struct MockTransport {
    messages: Arc<Mutex<Vec<Vec<u8>>>>,
    responses: Arc<Mutex<VecDeque<Vec<u8>>>>,
    closed: Arc<Mutex<bool>>,
}

impl MockTransport {
    pub fn new() -> Self {
        Self {
            messages: Arc::new(Mutex::new(Vec::new())),
            responses: Arc::new(Mutex::new(VecDeque::new())),
            closed: Arc::new(Mutex::new(false)),
        }
    }

    pub async fn add_response(&self, response: Vec<u8>) {
        self.responses.lock().await.push_back(response);
    }

    pub async fn get_sent_messages(&self) -> Vec<Vec<u8>> {
        self.messages.lock().await.clone()
    }
}

impl Transport for MockTransport {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        if *self.closed.lock().await {
            return Err(TransportError::Closed);
        }

        self.messages.lock().await.push(message.to_vec());
        Ok(())
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        if *self.closed.lock().await {
            return Err(TransportError::Closed);
        }

        let mut responses = self.responses.lock().await;
        responses.pop_front()
            .ok_or_else(|| TransportError::format("No response available".to_string()))
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        *self.closed.lock().await = true;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_mock_transport() {
        let mut transport = MockTransport::new();

        // Add response
        transport.add_response(b"test response".to_vec()).await;

        // Test send
        transport.send(b"test message").await.unwrap();

        // Test receive
        let response = transport.receive().await.unwrap();
        assert_eq!(response, b"test response");

        // Verify sent messages
        let sent = transport.get_sent_messages().await;
        assert_eq!(sent.len(), 1);
        assert_eq!(sent[0], b"test message");
    }
}

Integration Testing

Test transport integration with the MCP protocol:

use airsprotocols_mcp::integration::server::JsonRpcServer;
use airsprotocols_mcp::protocol::jsonrpc::message::*;

#[tokio::test]
async fn test_custom_transport_integration() {
    let transport = CustomTransport::new(CustomConfig {
        endpoint: "ws://localhost:8080".to_string(),
        timeout_ms: 5000,
        max_message_size: 1024 * 1024,
    }).await.unwrap();

    let server = JsonRpcServer::new(transport).await.unwrap();

    // Test request handling
    let request_handler = |req: JsonRpcRequest| async move {
        JsonRpcResponse::success(req.id, serde_json::Value::String("pong".to_string()))
    };

    let notification_handler = |_notif: JsonRpcNotification| async {
        // Handle notification
    };

    // Run server with custom transport
    server.run(request_handler, notification_handler).await.unwrap();
}

Production Deployment

Configuration Management

Structure configuration for production deployments:

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportConfig {
    pub transport_type: TransportType,
    pub stdio: Option<StdioConfig>,
    pub http: Option<HttpConfig>,
    pub websocket: Option<WebSocketConfig>,
    pub custom: Option<CustomConfig>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TransportType {
    Stdio,
    Http,
    WebSocket,
    Custom(String),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StdioConfig {
    pub max_message_size: usize,
    pub buffer_pool_size: Option<usize>,
    pub enable_buffers: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
    pub endpoint: String,
    pub timeout_ms: u64,
    pub max_retries: u32,
    pub enable_tls: bool,
    pub certificate_path: Option<String>,
}

pub async fn create_transport_from_config(
    config: &TransportConfig
) -> Result<Box<dyn Transport<Error = TransportError>>, TransportError> {
    match config.transport_type {
        TransportType::Stdio => {
            let stdio_config = config.stdio.as_ref()
                .ok_or_else(|| TransportError::format("Missing STDIO config".to_string()))?;

            if stdio_config.enable_buffers {
                let buffer_config = BufferConfig {
                    pool_size: stdio_config.buffer_pool_size.unwrap_or(10),
                    max_message_size: stdio_config.max_message_size,
                    ..Default::default()
                };
                Ok(Box::new(StdioTransport::with_buffer_config(buffer_config).await?))
            } else {
                Ok(Box::new(StdioTransport::with_max_message_size(
                    stdio_config.max_message_size
                ).await?))
            }
        }
        TransportType::Http => {
            let http_config = config.http.as_ref()
                .ok_or_else(|| TransportError::format("Missing HTTP config".to_string()))?;

            let endpoint = http_config.endpoint.parse()
                .map_err(|e| TransportError::format(format!("Invalid endpoint: {}", e)))?;

            Ok(Box::new(HttpTransport::new(endpoint, http_config.timeout_ms)?))
        }
        TransportType::WebSocket => {
            let ws_config = config.websocket.as_ref()
                .ok_or_else(|| TransportError::format("Missing WebSocket config".to_string()))?;

            Ok(Box::new(WebSocketTransport::new(ws_config.url.clone()).await?))
        }
        TransportType::Custom(name) => {
            // Load custom transport by name
            Err(TransportError::format(format!("Custom transport '{}' not implemented", name)))
        }
    }
}

Monitoring and Metrics

Implement comprehensive monitoring:

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;

pub struct TransportMetrics {
    pub messages_sent: AtomicU64,
    pub messages_received: AtomicU64,
    pub bytes_sent: AtomicU64,
    pub bytes_received: AtomicU64,
    pub errors: AtomicU64,
    pub connection_count: AtomicU64,
}

impl TransportMetrics {
    pub fn new() -> Self {
        Self {
            messages_sent: AtomicU64::new(0),
            messages_received: AtomicU64::new(0),
            bytes_sent: AtomicU64::new(0),
            bytes_received: AtomicU64::new(0),
            errors: AtomicU64::new(0),
            connection_count: AtomicU64::new(0),
        }
    }

    pub fn record_send(&self, byte_count: usize) {
        self.messages_sent.fetch_add(1, Ordering::Relaxed);
        self.bytes_sent.fetch_add(byte_count as u64, Ordering::Relaxed);
    }

    pub fn record_receive(&self, byte_count: usize) {
        self.messages_received.fetch_add(1, Ordering::Relaxed);
        self.bytes_received.fetch_add(byte_count as u64, Ordering::Relaxed);
    }

    pub fn record_error(&self) {
        self.errors.fetch_add(1, Ordering::Relaxed);
    }
}

pub struct MonitoredTransport<T> {
    inner: T,
    metrics: Arc<TransportMetrics>,
}

impl<T: Transport> Transport for MonitoredTransport<T> {
    type Error = TransportError;

    async fn send(&mut self, message: &[u8]) -> Result<(), Self::Error> {
        let start = Instant::now();

        match self.inner.send(message).await {
            Ok(()) => {
                self.metrics.record_send(message.len());
                Ok(())
            }
            Err(e) => {
                self.metrics.record_error();
                Err(TransportError::format(e.to_string()))
            }
        }
    }

    async fn receive(&mut self) -> Result<Vec<u8>, Self::Error> {
        match self.inner.receive().await {
            Ok(message) => {
                self.metrics.record_receive(message.len());
                Ok(message)
            }
            Err(e) => {
                self.metrics.record_error();
                Err(TransportError::format(e.to_string()))
            }
        }
    }

    async fn close(&mut self) -> Result<(), Self::Error> {
        self.inner.close().await
            .map_err(|e| TransportError::format(e.to_string()))
    }
}

Next Steps

With custom transport implementation complete, you're ready to:

  1. Patterns → - Explore patterns and techniques

Transport flexibility is one of AIRS MCP's key strengths. Whether you need simple STDIO communication or complex multi-protocol routing, the transport abstraction layer provides the foundation for any deployment scenario.