Quellcode durchsuchen

Repair MCP stdio runtime tests after the in-flight JSON-RPC slice

The dirty stdio slice had two real regressions in its new JSON-RPC test coverage: the embedded Python helper was written with broken string literals, and direct execution of the freshly written helper could fail with ETXTBSY on Linux. The repair keeps scope inside mcp_stdio.rs by fixing the helper strings and invoking the JSON-RPC helper through python3 while leaving the existing stdio process behavior unchanged.

Constraint: Keep the repair limited to rust/crates/runtime/src/mcp_stdio.rs
Constraint: Must satisfy fmt, clippy -D warnings, and runtime tests before shipping
Rejected: Revert the entire JSON-RPC stdio coverage addition | unnecessary once the helper/test defects were isolated
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Keep ephemeral stdio test helpers portable and avoid directly execing freshly written scripts when an interpreter invocation is sufficient
Tested: cargo fmt --all; cargo clippy -p runtime --all-targets -- -D warnings; cargo test -p runtime
Not-tested: Cross-platform behavior outside the current Linux runtime
Yeachan-Heo vor 2 Monaten
Ursprung
Commit
b61e68911e
1 geänderte Dateien mit 127 neuen und 3 gelöschten Zeilen
  1. 127 3
      rust/crates/runtime/src/mcp_stdio.rs

+ 127 - 3
rust/crates/runtime/src/mcp_stdio.rs

@@ -2,9 +2,11 @@ use std::collections::BTreeMap;
 use std::io;
 use std::process::Stdio;
 
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
 use tokio::process::{Child, ChildStdin, ChildStdout, Command};
 
+use serde_json::Value as JsonRpcMessage;
+
 use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
 
 #[derive(Debug)]
@@ -12,7 +14,7 @@ use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTranspor
 pub struct McpStdioProcess {
     child: Child,
     stdin: ChildStdin,
-    stdout: ChildStdout,
+    stdout: BufReader<ChildStdout>,
 }
 
 #[allow(dead_code)]
@@ -39,7 +41,7 @@ impl McpStdioProcess {
         Ok(Self {
             child,
             stdin,
-            stdout,
+            stdout: BufReader::new(stdout),
         })
     }
 
@@ -58,6 +60,49 @@ impl McpStdioProcess {
         Ok(buffer)
     }
 
+    pub async fn write_jsonrpc_message(&mut self, message: &JsonRpcMessage) -> io::Result<()> {
+        let encoded = encode_jsonrpc_message(message)?;
+        self.write_all(&encoded).await?;
+        self.flush().await
+    }
+
+    pub async fn read_jsonrpc_message(&mut self) -> io::Result<JsonRpcMessage> {
+        let payload = self.read_frame().await?;
+        serde_json::from_slice(&payload)
+            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
+    }
+
+    async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
+        let mut content_length = None;
+        loop {
+            let mut line = String::new();
+            let bytes_read = self.stdout.read_line(&mut line).await?;
+            if bytes_read == 0 {
+                return Err(io::Error::new(
+                    io::ErrorKind::UnexpectedEof,
+                    "MCP stdio stream closed while reading headers",
+                ));
+            }
+            if line == "\r\n" {
+                break;
+            }
+            if let Some(value) = line.strip_prefix("Content-Length:") {
+                let parsed = value
+                    .trim()
+                    .parse::<usize>()
+                    .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
+                content_length = Some(parsed);
+            }
+        }
+
+        let content_length = content_length.ok_or_else(|| {
+            io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
+        })?;
+        let mut payload = vec![0_u8; content_length];
+        self.stdout.read_exact(&mut payload).await?;
+        Ok(payload)
+    }
+
     pub async fn terminate(&mut self) -> io::Result<()> {
         self.child.kill().await
     }
@@ -88,6 +133,15 @@ fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
     }
 }
 
+fn encode_jsonrpc_message(message: &JsonRpcMessage) -> io::Result<Vec<u8>> {
+    let body = serde_json::to_vec(message)
+        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
+    let header = format!("Content-Length: {}\r\n\r\n", body.len());
+    let mut framed = header.into_bytes();
+    framed.extend(body);
+    Ok(framed)
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::BTreeMap;
@@ -129,6 +183,37 @@ mod tests {
         script_path
     }
 
+    fn write_jsonrpc_script() -> PathBuf {
+        let root = temp_dir();
+        fs::create_dir_all(&root).expect("temp dir");
+        let script_path = root.join("jsonrpc-mcp.py");
+        let script = [
+            "#!/usr/bin/env python3",
+            "import json, sys",
+            "header = b''",
+            r"while not header.endswith(b'\r\n\r\n'):",
+            "    chunk = sys.stdin.buffer.read(1)",
+            "    if not chunk:",
+            "        raise SystemExit(1)",
+            "    header += chunk",
+            "length = 0",
+            r"for line in header.decode().split('\r\n'):",
+            r"    if line.lower().startswith('content-length:'):",
+            r"        length = int(line.split(':', 1)[1].strip())",
+            "payload = sys.stdin.buffer.read(length)",
+            "json.loads(payload.decode())",
+            r"response = json.dumps({'jsonrpc': '2.0', 'id': 1, 'result': {'echo': True}}).encode()",
+            r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)",
+            "sys.stdout.buffer.flush()",
+            "",
+        ]
+        .join("\n");
+        fs::write(&script_path, script).expect("write script");
+        let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
+        permissions.set_mode(0o755);
+        fs::set_permissions(&script_path, permissions).expect("chmod");
+        script_path
+    }
     fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
         let config = ScopedMcpServerConfig {
             scope: ConfigSource::Local,
@@ -185,6 +270,45 @@ mod tests {
         assert_eq!(error.kind(), ErrorKind::InvalidInput);
     }
 
+    #[test]
+    fn round_trips_jsonrpc_messages_over_stdio_frames() {
+        let runtime = Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .expect("runtime");
+        runtime.block_on(async {
+            let script_path = write_jsonrpc_script();
+            let transport = crate::mcp_client::McpStdioTransport {
+                command: "python3".to_string(),
+                args: vec![script_path.to_string_lossy().into_owned()],
+                env: BTreeMap::new(),
+            };
+            let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
+            process
+                .write_jsonrpc_message(&serde_json::json!({
+                    "jsonrpc": "2.0",
+                    "id": 1,
+                    "method": "initialize"
+                }))
+                .await
+                .expect("write jsonrpc message");
+
+            let response = process
+                .read_jsonrpc_message()
+                .await
+                .expect("read jsonrpc response");
+            assert_eq!(response["jsonrpc"], serde_json::json!("2.0"));
+            assert_eq!(response["id"], serde_json::json!(1));
+            assert_eq!(response["result"]["echo"], serde_json::json!(true));
+
+            let status = process.wait().await.expect("wait for exit");
+            assert!(status.success());
+
+            fs::remove_file(&script_path).expect("cleanup script");
+            fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
+        });
+    }
+
     #[test]
     fn direct_spawn_uses_transport_env() {
         let runtime = Builder::new_current_thread()