瀏覽代碼

Add real stdio MCP process wrapper

Add a minimal runtime stdio MCP launcher that spawns configured server processes with piped stdin/stdout, applies transport env, and exposes async write/read/terminate/wait helpers for future JSON-RPC integration.

The wrapper stays intentionally small: it does not yet implement protocol framing or connection lifecycle management, but it is real process orchestration rather than placeholder scaffolding. Tests use a temporary executable script to prove env propagation and bidirectional stdio round-tripping.

Constraint: Keep the slice minimal and testable while using the real tokio process surface
Constraint: Runtime verification must pass cleanly under fmt, clippy, and tests
Rejected: Add full JSON-RPC framing and session orchestration in the same commit | too much scope for a clean launcher slice
Rejected: Fake the process wrapper behind mocks only | would not validate spawning, env injection, or stdio wiring
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Layer future MCP protocol framing on top of McpStdioProcess rather than bypassing it with ad hoc process management
Tested: cargo fmt --all; cargo clippy -p runtime --all-targets -- -D warnings; cargo test -p runtime
Not-tested: live third-party MCP servers; long-running process supervision; stderr capture policy
Yeachan-Heo 2 月之前
父節點
當前提交
9b0c9b5739
共有 3 個文件被更改,包括 213 次插入1 次删除
  1. 1 1
      rust/crates/runtime/Cargo.toml
  2. 1 0
      rust/crates/runtime/src/lib.rs
  3. 211 0
      rust/crates/runtime/src/mcp_stdio.rs

+ 1 - 1
rust/crates/runtime/Cargo.toml

@@ -11,7 +11,7 @@ glob = "0.3"
 regex = "1"
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
-tokio = { version = "1", features = ["macros", "process", "rt", "rt-multi-thread", "time"] }
+tokio = { version = "1", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] }
 walkdir = "2"
 
 [lints]

+ 1 - 0
rust/crates/runtime/src/lib.rs

@@ -7,6 +7,7 @@ mod file_ops;
 mod json;
 mod mcp;
 mod mcp_client;
+mod mcp_stdio;
 mod oauth;
 mod permissions;
 mod prompt;

+ 211 - 0
rust/crates/runtime/src/mcp_stdio.rs

@@ -0,0 +1,211 @@
+use std::collections::BTreeMap;
+use std::io;
+use std::process::Stdio;
+
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::process::{Child, ChildStdin, ChildStdout, Command};
+
+use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
+
+#[derive(Debug)]
+#[allow(dead_code)]
+pub struct McpStdioProcess {
+    child: Child,
+    stdin: ChildStdin,
+    stdout: ChildStdout,
+}
+
+#[allow(dead_code)]
+impl McpStdioProcess {
+    pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
+        let mut command = Command::new(&transport.command);
+        command
+            .args(&transport.args)
+            .stdin(Stdio::piped())
+            .stdout(Stdio::piped())
+            .stderr(Stdio::inherit());
+        apply_env(&mut command, &transport.env);
+
+        let mut child = command.spawn()?;
+        let stdin = child
+            .stdin
+            .take()
+            .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
+        let stdout = child
+            .stdout
+            .take()
+            .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
+
+        Ok(Self {
+            child,
+            stdin,
+            stdout,
+        })
+    }
+
+    pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
+        self.stdin.write_all(bytes).await
+    }
+
+    pub async fn flush(&mut self) -> io::Result<()> {
+        self.stdin.flush().await
+    }
+
+    pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
+        let mut buffer = vec![0_u8; 4096];
+        let read = self.stdout.read(&mut buffer).await?;
+        buffer.truncate(read);
+        Ok(buffer)
+    }
+
+    pub async fn terminate(&mut self) -> io::Result<()> {
+        self.child.kill().await
+    }
+
+    pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
+        self.child.wait().await
+    }
+}
+
+#[allow(dead_code)]
+pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
+    match &bootstrap.transport {
+        McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
+        other => Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!(
+                "MCP bootstrap transport for {} is not stdio: {other:?}",
+                bootstrap.server_name
+            ),
+        )),
+    }
+}
+
+#[allow(dead_code)]
+fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
+    for (key, value) in env {
+        command.env(key, value);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::BTreeMap;
+    use std::fs;
+    use std::io::ErrorKind;
+    use std::os::unix::fs::PermissionsExt;
+    use std::path::{Path, PathBuf};
+    use std::time::{SystemTime, UNIX_EPOCH};
+
+    use tokio::runtime::Builder;
+
+    use crate::config::{
+        ConfigSource, McpServerConfig, McpStdioServerConfig, ScopedMcpServerConfig,
+    };
+    use crate::mcp_client::McpClientBootstrap;
+
+    use super::{spawn_mcp_stdio_process, McpStdioProcess};
+
+    fn temp_dir() -> PathBuf {
+        let nanos = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect("time should be after epoch")
+            .as_nanos();
+        std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}"))
+    }
+
+    fn write_echo_script() -> PathBuf {
+        let root = temp_dir();
+        fs::create_dir_all(&root).expect("temp dir");
+        let script_path = root.join("echo-mcp.sh");
+        fs::write(
+            &script_path,
+            "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
+        )
+        .expect("write script");
+        let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
+        permissions.set_mode(0o755);
+        fs::set_permissions(&script_path, permissions).expect("chmod");
+        script_path
+    }
+
+    fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
+        let config = ScopedMcpServerConfig {
+            scope: ConfigSource::Local,
+            config: McpServerConfig::Stdio(McpStdioServerConfig {
+                command: script_path.to_string_lossy().into_owned(),
+                args: Vec::new(),
+                env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
+            }),
+        };
+        McpClientBootstrap::from_scoped_config("stdio server", &config)
+    }
+
+    #[test]
+    fn spawns_stdio_process_and_round_trips_io() {
+        let runtime = Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .expect("runtime");
+        runtime.block_on(async {
+            let script_path = write_echo_script();
+            let bootstrap = sample_bootstrap(&script_path);
+            let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
+
+            let ready = process.read_available().await.expect("read ready");
+            assert_eq!(String::from_utf8_lossy(&ready), "READY:secret-value\n");
+
+            process
+                .write_all(b"ping from client\n")
+                .await
+                .expect("write input");
+            process.flush().await.expect("flush");
+
+            let echoed = process.read_available().await.expect("read echo");
+            assert_eq!(String::from_utf8_lossy(&echoed), "ECHO:ping from client\n");
+
+            let status = process.wait().await.expect("wait for exit");
+            assert!(status.success());
+
+            fs::remove_file(&script_path).expect("cleanup script");
+            fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
+        });
+    }
+
+    #[test]
+    fn rejects_non_stdio_bootstrap() {
+        let config = ScopedMcpServerConfig {
+            scope: ConfigSource::Local,
+            config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
+                name: "sdk-server".to_string(),
+            }),
+        };
+        let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
+        let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
+        assert_eq!(error.kind(), ErrorKind::InvalidInput);
+    }
+
+    #[test]
+    fn direct_spawn_uses_transport_env() {
+        let runtime = Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .expect("runtime");
+        runtime.block_on(async {
+            let script_path = write_echo_script();
+            let transport = crate::mcp_client::McpStdioTransport {
+                command: script_path.to_string_lossy().into_owned(),
+                args: Vec::new(),
+                env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
+            };
+            let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
+            let ready = process.read_available().await.expect("read ready");
+            assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
+            process.terminate().await.expect("terminate child");
+            let _ = process.wait().await.expect("wait after kill");
+
+            fs::remove_file(&script_path).expect("cleanup script");
+            fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
+        });
+    }
+}