Skip to content

Component Interaction Patterns

Capability Negotiation Sequence

// Capability negotiation orchestration
pub struct CapabilityNegotiator {
    server_capabilities: ServerCapabilities,
    client_requirements: ClientRequirements,
}

impl CapabilityNegotiator {
    pub async fn negotiate(
        &self,
        transport: &dyn BidirectionalTransport,
    ) -> Result<NegotiatedCapabilities, NegotiationError> {
        // 1. Client sends initialize request
        let init_request = JsonRpcRequest {
            method: "initialize".to_string(),
            params: InitializeParams {
                protocol_version: PROTOCOL_VERSION,
                capabilities: self.client_requirements.to_capabilities(),
                client_info: self.get_client_info(),
            },
            id: RequestId::generate(),
        };

        // 2. Server responds with capabilities
        let response = self.send_request(init_request, transport).await?;
        let server_caps: InitializeResult = response.extract_result()?;

        // 3. Compute intersection of capabilities
        let negotiated = self.compute_capability_intersection(
            &self.client_requirements,
            &server_caps.capabilities,
        )?;

        // 4. Send initialized notification
        let initialized = JsonRpcNotification {
            method: "notifications/initialized".to_string(),
            params: serde_json::Value::Null,
        };
        transport.send_message(JsonRpcMessage::from(initialized)).await?;

        Ok(negotiated)
    }
}

Security Integration Pattern

// Security concerns integrated at message processing level
pub struct SecureMessageProcessor {
    base_processor: JsonRpcProcessor,
    authenticator: Box<dyn Authenticator>,
    authorizer: Box<dyn Authorizer>,
    audit_logger: Box<dyn AuditLogger>,
}

impl SecureMessageProcessor {
    pub async fn process_message(
        &self,
        message: JsonRpcMessage,
        security_context: &SecurityContext,
    ) -> Result<Option<JsonRpcMessage>, ProcessingError> {
        // 1. Authentication check
        self.authenticator.verify_message(&message, security_context).await?;

        // 2. Authorization check
        self.authorizer.check_permission(&message, security_context).await?;

        // 3. Audit logging (pre-execution)
        self.audit_logger.log_message_received(&message, security_context).await?;

        // 4. Process message
        let result = self.base_processor.process_message(message, &security_context.into()).await;

        // 5. Audit logging (post-execution)
        self.audit_logger.log_message_processed(&result, security_context).await?;

        result
    }
}

Resource Subscription Management

// Real-time subscription management with cleanup
pub struct SubscriptionManager {
    subscriptions: DashMap<SubscriptionId, Subscription>,
    resource_providers: Vec<Box<dyn ResourceProvider>>,
    notification_sender: mpsc::UnboundedSender<ResourceNotification>,
}

impl SubscriptionManager {
    pub async fn subscribe_to_resource(
        &self,
        uri: &str,
        connection_id: &str,
    ) -> Result<SubscriptionId, SubscriptionError> {
        let subscription_id = SubscriptionId::generate();
        let subscription = Subscription {
            id: subscription_id.clone(),
            uri: uri.to_string(),
            connection_id: connection_id.to_string(),
            created_at: Utc::now(),
        };

        // Register subscription
        self.subscriptions.insert(subscription_id.clone(), subscription);

        // Setup resource watching
        for provider in &self.resource_providers {
            if provider.supports_uri(uri) {
                provider.watch_resource(uri, subscription_id.clone()).await?;
                break;
            }
        }

        Ok(subscription_id)
    }

    pub async fn handle_resource_change(
        &self,
        uri: &str,
        change: ResourceChange,
    ) -> Result<(), SubscriptionError> {
        // Find all subscriptions for this resource
        let affected_subscriptions: Vec<_> = self.subscriptions
            .iter()
            .filter(|entry| entry.value().uri == uri)
            .map(|entry| entry.key().clone())
            .collect();

        // Send notifications
        for subscription_id in affected_subscriptions {
            let notification = ResourceNotification {
                subscription_id,
                uri: uri.to_string(),
                change: change.clone(),
            };

            self.notification_sender.send(notification)?;
        }

        Ok(())
    }
}