| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399 |
- use std::collections::BTreeMap;
- use std::future::Future;
- use std::io;
- use std::process::Stdio;
- use std::time::Duration;
- use serde::de::DeserializeOwned;
- use serde::{Deserialize, Serialize};
- use serde_json::Value as JsonValue;
- use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
- use tokio::process::{Child, ChildStdin, ChildStdout, Command};
- use tokio::time::timeout;
- use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
- use crate::mcp::mcp_tool_name;
- use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
- #[cfg(test)]
- const MCP_INITIALIZE_TIMEOUT_MS: u64 = 200;
- #[cfg(not(test))]
- const MCP_INITIALIZE_TIMEOUT_MS: u64 = 10_000;
- #[cfg(test)]
- const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 300;
- #[cfg(not(test))]
- const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 30_000;
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
- #[serde(untagged)]
- pub enum JsonRpcId {
- Number(u64),
- String(String),
- Null,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct JsonRpcRequest<T = JsonValue> {
- pub jsonrpc: String,
- pub id: JsonRpcId,
- pub method: String,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub params: Option<T>,
- }
- impl<T> JsonRpcRequest<T> {
- #[must_use]
- pub fn new(id: JsonRpcId, method: impl Into<String>, params: Option<T>) -> Self {
- Self {
- jsonrpc: "2.0".to_string(),
- id,
- method: method.into(),
- params,
- }
- }
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct JsonRpcError {
- pub code: i64,
- pub message: String,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub data: Option<JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct JsonRpcResponse<T = JsonValue> {
- pub jsonrpc: String,
- pub id: JsonRpcId,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub result: Option<T>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub error: Option<JsonRpcError>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpInitializeParams {
- pub protocol_version: String,
- pub capabilities: JsonValue,
- pub client_info: McpInitializeClientInfo,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpInitializeClientInfo {
- pub name: String,
- pub version: String,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpInitializeResult {
- pub protocol_version: String,
- pub capabilities: JsonValue,
- pub server_info: McpInitializeServerInfo,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpInitializeServerInfo {
- pub name: String,
- pub version: String,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpListToolsParams {
- #[serde(skip_serializing_if = "Option::is_none")]
- pub cursor: Option<String>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct McpTool {
- pub name: String,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub description: Option<String>,
- #[serde(rename = "inputSchema", skip_serializing_if = "Option::is_none")]
- pub input_schema: Option<JsonValue>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub annotations: Option<JsonValue>,
- #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
- pub meta: Option<JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpListToolsResult {
- pub tools: Vec<McpTool>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub next_cursor: Option<String>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpToolCallParams {
- pub name: String,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub arguments: Option<JsonValue>,
- #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
- pub meta: Option<JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct McpToolCallContent {
- #[serde(rename = "type")]
- pub kind: String,
- #[serde(flatten)]
- pub data: BTreeMap<String, JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpToolCallResult {
- #[serde(default)]
- pub content: Vec<McpToolCallContent>,
- #[serde(default)]
- pub structured_content: Option<JsonValue>,
- #[serde(default)]
- pub is_error: Option<bool>,
- #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
- pub meta: Option<JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpListResourcesParams {
- #[serde(skip_serializing_if = "Option::is_none")]
- pub cursor: Option<String>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct McpResource {
- pub uri: String,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub name: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub description: Option<String>,
- #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
- pub mime_type: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub annotations: Option<JsonValue>,
- #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
- pub meta: Option<JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpListResourcesResult {
- pub resources: Vec<McpResource>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub next_cursor: Option<String>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- #[serde(rename_all = "camelCase")]
- pub struct McpReadResourceParams {
- pub uri: String,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct McpResourceContents {
- pub uri: String,
- #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
- pub mime_type: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub text: Option<String>,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub blob: Option<String>,
- #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
- pub meta: Option<JsonValue>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
- pub struct McpReadResourceResult {
- pub contents: Vec<McpResourceContents>,
- }
- #[derive(Debug, Clone, PartialEq)]
- pub struct ManagedMcpTool {
- pub server_name: String,
- pub qualified_name: String,
- pub raw_name: String,
- pub tool: McpTool,
- }
- #[derive(Debug, Clone, PartialEq, Eq)]
- pub struct UnsupportedMcpServer {
- pub server_name: String,
- pub transport: McpTransport,
- pub reason: String,
- }
- #[derive(Debug)]
- pub enum McpServerManagerError {
- Io(io::Error),
- Transport {
- server_name: String,
- method: &'static str,
- source: io::Error,
- },
- JsonRpc {
- server_name: String,
- method: &'static str,
- error: JsonRpcError,
- },
- InvalidResponse {
- server_name: String,
- method: &'static str,
- details: String,
- },
- Timeout {
- server_name: String,
- method: &'static str,
- timeout_ms: u64,
- },
- UnknownTool {
- qualified_name: String,
- },
- UnknownServer {
- server_name: String,
- },
- }
- impl std::fmt::Display for McpServerManagerError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Self::Io(error) => write!(f, "{error}"),
- Self::Transport {
- server_name,
- method,
- source,
- } => write!(
- f,
- "MCP server `{server_name}` transport failed during {method}: {source}"
- ),
- Self::JsonRpc {
- server_name,
- method,
- error,
- } => write!(
- f,
- "MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})",
- error.message, error.code
- ),
- Self::InvalidResponse {
- server_name,
- method,
- details,
- } => write!(
- f,
- "MCP server `{server_name}` returned invalid response for {method}: {details}"
- ),
- Self::Timeout {
- server_name,
- method,
- timeout_ms,
- } => write!(
- f,
- "MCP server `{server_name}` timed out after {timeout_ms} ms while handling {method}"
- ),
- Self::UnknownTool { qualified_name } => {
- write!(f, "unknown MCP tool `{qualified_name}`")
- }
- Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"),
- }
- }
- }
- impl std::error::Error for McpServerManagerError {
- fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
- match self {
- Self::Io(error) => Some(error),
- Self::Transport { source, .. } => Some(source),
- Self::JsonRpc { .. }
- | Self::InvalidResponse { .. }
- | Self::Timeout { .. }
- | Self::UnknownTool { .. }
- | Self::UnknownServer { .. } => None,
- }
- }
- }
- impl From<io::Error> for McpServerManagerError {
- fn from(value: io::Error) -> Self {
- Self::Io(value)
- }
- }
- #[derive(Debug, Clone, PartialEq, Eq)]
- struct ToolRoute {
- server_name: String,
- raw_name: String,
- }
- #[derive(Debug)]
- struct ManagedMcpServer {
- bootstrap: McpClientBootstrap,
- process: Option<McpStdioProcess>,
- initialized: bool,
- }
- impl ManagedMcpServer {
- fn new(bootstrap: McpClientBootstrap) -> Self {
- Self {
- bootstrap,
- process: None,
- initialized: false,
- }
- }
- }
- #[derive(Debug)]
- pub struct McpServerManager {
- servers: BTreeMap<String, ManagedMcpServer>,
- unsupported_servers: Vec<UnsupportedMcpServer>,
- tool_index: BTreeMap<String, ToolRoute>,
- next_request_id: u64,
- }
- impl McpServerManager {
- #[must_use]
- pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
- Self::from_servers(config.mcp().servers())
- }
- #[must_use]
- pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
- let mut managed_servers = BTreeMap::new();
- let mut unsupported_servers = Vec::new();
- for (server_name, server_config) in servers {
- if server_config.transport() == McpTransport::Stdio {
- let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config);
- managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
- } else {
- unsupported_servers.push(UnsupportedMcpServer {
- server_name: server_name.clone(),
- transport: server_config.transport(),
- reason: format!(
- "transport {:?} is not supported by McpServerManager",
- server_config.transport()
- ),
- });
- }
- }
- Self {
- servers: managed_servers,
- unsupported_servers,
- tool_index: BTreeMap::new(),
- next_request_id: 1,
- }
- }
- #[must_use]
- pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
- &self.unsupported_servers
- }
- pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
- let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
- let mut discovered_tools = Vec::new();
- for server_name in server_names {
- let server_tools = self.discover_tools_for_server(&server_name).await?;
- self.clear_routes_for_server(&server_name);
- for tool in server_tools {
- self.tool_index.insert(
- tool.qualified_name.clone(),
- ToolRoute {
- server_name: tool.server_name.clone(),
- raw_name: tool.raw_name.clone(),
- },
- );
- discovered_tools.push(tool);
- }
- }
- Ok(discovered_tools)
- }
- pub async fn call_tool(
- &mut self,
- qualified_tool_name: &str,
- arguments: Option<JsonValue>,
- ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
- let route = self
- .tool_index
- .get(qualified_tool_name)
- .cloned()
- .ok_or_else(|| McpServerManagerError::UnknownTool {
- qualified_name: qualified_tool_name.to_string(),
- })?;
- let timeout_ms = self.tool_call_timeout_ms(&route.server_name)?;
- self.ensure_server_ready(&route.server_name).await?;
- let request_id = self.take_request_id();
- let response = {
- let server = self.server_mut(&route.server_name)?;
- let process = server.process.as_mut().ok_or_else(|| {
- McpServerManagerError::InvalidResponse {
- server_name: route.server_name.clone(),
- method: "tools/call",
- details: "server process missing after initialization".to_string(),
- }
- })?;
- Self::run_process_request(
- &route.server_name,
- "tools/call",
- timeout_ms,
- process.call_tool(
- request_id,
- McpToolCallParams {
- name: route.raw_name,
- arguments,
- meta: None,
- },
- ),
- )
- .await
- };
- if let Err(error) = &response {
- if Self::should_reset_server(error) {
- self.reset_server(&route.server_name).await?;
- }
- }
- response
- }
- pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
- let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
- for server_name in server_names {
- let server = self.server_mut(&server_name)?;
- if let Some(process) = server.process.as_mut() {
- process.shutdown().await?;
- }
- server.process = None;
- server.initialized = false;
- }
- Ok(())
- }
- fn clear_routes_for_server(&mut self, server_name: &str) {
- self.tool_index
- .retain(|_, route| route.server_name != server_name);
- }
- fn server_mut(
- &mut self,
- server_name: &str,
- ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
- self.servers
- .get_mut(server_name)
- .ok_or_else(|| McpServerManagerError::UnknownServer {
- server_name: server_name.to_string(),
- })
- }
- fn take_request_id(&mut self) -> JsonRpcId {
- let id = self.next_request_id;
- self.next_request_id = self.next_request_id.saturating_add(1);
- JsonRpcId::Number(id)
- }
- fn tool_call_timeout_ms(&self, server_name: &str) -> Result<u64, McpServerManagerError> {
- let server = self
- .servers
- .get(server_name)
- .ok_or_else(|| McpServerManagerError::UnknownServer {
- server_name: server_name.to_string(),
- })?;
- match &server.bootstrap.transport {
- McpClientTransport::Stdio(transport) => Ok(transport.resolved_tool_call_timeout_ms()),
- other => Err(McpServerManagerError::InvalidResponse {
- server_name: server_name.to_string(),
- method: "tools/call",
- details: format!("unsupported MCP transport for stdio manager: {other:?}"),
- }),
- }
- }
- fn server_process_exited(&mut self, server_name: &str) -> Result<bool, McpServerManagerError> {
- let server = self.server_mut(server_name)?;
- match server.process.as_mut() {
- Some(process) => Ok(process.has_exited()?),
- None => Ok(false),
- }
- }
- async fn discover_tools_for_server(
- &mut self,
- server_name: &str,
- ) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
- let mut attempts = 0;
- loop {
- match self.discover_tools_for_server_once(server_name).await {
- Ok(tools) => return Ok(tools),
- Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
- self.reset_server(server_name).await?;
- attempts += 1;
- }
- Err(error) => {
- if Self::should_reset_server(&error) {
- self.reset_server(server_name).await?;
- }
- return Err(error);
- }
- }
- }
- }
- async fn discover_tools_for_server_once(
- &mut self,
- server_name: &str,
- ) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
- self.ensure_server_ready(server_name).await?;
- let mut discovered_tools = Vec::new();
- let mut cursor = None;
- loop {
- let request_id = self.take_request_id();
- let response = {
- let server = self.server_mut(server_name)?;
- let process = server.process.as_mut().ok_or_else(|| {
- McpServerManagerError::InvalidResponse {
- server_name: server_name.to_string(),
- method: "tools/list",
- details: "server process missing after initialization".to_string(),
- }
- })?;
- Self::run_process_request(
- server_name,
- "tools/list",
- MCP_LIST_TOOLS_TIMEOUT_MS,
- process.list_tools(
- request_id,
- Some(McpListToolsParams {
- cursor: cursor.clone(),
- }),
- ),
- )
- .await?
- };
- if let Some(error) = response.error {
- return Err(McpServerManagerError::JsonRpc {
- server_name: server_name.to_string(),
- method: "tools/list",
- error,
- });
- }
- let result =
- response
- .result
- .ok_or_else(|| McpServerManagerError::InvalidResponse {
- server_name: server_name.to_string(),
- method: "tools/list",
- details: "missing result payload".to_string(),
- })?;
- for tool in result.tools {
- let qualified_name = mcp_tool_name(server_name, &tool.name);
- discovered_tools.push(ManagedMcpTool {
- server_name: server_name.to_string(),
- qualified_name,
- raw_name: tool.name.clone(),
- tool,
- });
- }
- match result.next_cursor {
- Some(next_cursor) => cursor = Some(next_cursor),
- None => break,
- }
- }
- Ok(discovered_tools)
- }
- async fn reset_server(&mut self, server_name: &str) -> Result<(), McpServerManagerError> {
- let mut process = {
- let server = self.server_mut(server_name)?;
- server.initialized = false;
- server.process.take()
- };
- if let Some(process) = process.as_mut() {
- let _ = process.shutdown().await;
- }
- Ok(())
- }
- fn is_retryable_error(error: &McpServerManagerError) -> bool {
- matches!(
- error,
- McpServerManagerError::Transport { .. } | McpServerManagerError::Timeout { .. }
- )
- }
- fn should_reset_server(error: &McpServerManagerError) -> bool {
- matches!(
- error,
- McpServerManagerError::Transport { .. }
- | McpServerManagerError::Timeout { .. }
- | McpServerManagerError::InvalidResponse { .. }
- )
- }
- async fn run_process_request<T, F>(
- server_name: &str,
- method: &'static str,
- timeout_ms: u64,
- future: F,
- ) -> Result<T, McpServerManagerError>
- where
- F: Future<Output = io::Result<T>>,
- {
- match timeout(Duration::from_millis(timeout_ms), future).await {
- Ok(Ok(value)) => Ok(value),
- Ok(Err(error)) if error.kind() == io::ErrorKind::InvalidData => {
- Err(McpServerManagerError::InvalidResponse {
- server_name: server_name.to_string(),
- method,
- details: error.to_string(),
- })
- }
- Ok(Err(source)) => Err(McpServerManagerError::Transport {
- server_name: server_name.to_string(),
- method,
- source,
- }),
- Err(_) => Err(McpServerManagerError::Timeout {
- server_name: server_name.to_string(),
- method,
- timeout_ms,
- }),
- }
- }
- async fn ensure_server_ready(
- &mut self,
- server_name: &str,
- ) -> Result<(), McpServerManagerError> {
- if self.server_process_exited(server_name)? {
- self.reset_server(server_name).await?;
- }
- let mut attempts = 0;
- loop {
- let needs_spawn = self
- .servers
- .get(server_name)
- .map(|server| server.process.is_none())
- .ok_or_else(|| McpServerManagerError::UnknownServer {
- server_name: server_name.to_string(),
- })?;
- if needs_spawn {
- let server = self.server_mut(server_name)?;
- server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?);
- server.initialized = false;
- }
- let needs_initialize = self
- .servers
- .get(server_name)
- .map(|server| !server.initialized)
- .ok_or_else(|| McpServerManagerError::UnknownServer {
- server_name: server_name.to_string(),
- })?;
- if !needs_initialize {
- return Ok(());
- }
- let request_id = self.take_request_id();
- let response = {
- let server = self.server_mut(server_name)?;
- let process = server.process.as_mut().ok_or_else(|| {
- McpServerManagerError::InvalidResponse {
- server_name: server_name.to_string(),
- method: "initialize",
- details: "server process missing before initialize".to_string(),
- }
- })?;
- Self::run_process_request(
- server_name,
- "initialize",
- MCP_INITIALIZE_TIMEOUT_MS,
- process.initialize(request_id, default_initialize_params()),
- )
- .await
- };
- let response = match response {
- Ok(response) => response,
- Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
- self.reset_server(server_name).await?;
- attempts += 1;
- continue;
- }
- Err(error) => {
- if Self::should_reset_server(&error) {
- self.reset_server(server_name).await?;
- }
- return Err(error);
- }
- };
- if let Some(error) = response.error {
- return Err(McpServerManagerError::JsonRpc {
- server_name: server_name.to_string(),
- method: "initialize",
- error,
- });
- }
- if response.result.is_none() {
- let error = McpServerManagerError::InvalidResponse {
- server_name: server_name.to_string(),
- method: "initialize",
- details: "missing result payload".to_string(),
- };
- self.reset_server(server_name).await?;
- return Err(error);
- }
- let server = self.server_mut(server_name)?;
- server.initialized = true;
- return Ok(());
- }
- }
- }
- #[derive(Debug)]
- pub struct McpStdioProcess {
- child: Child,
- stdin: ChildStdin,
- stdout: BufReader<ChildStdout>,
- }
- impl McpStdioProcess {
- pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
- let mut command = Command::new(&transport.command);
- command
- .args(&transport.args)
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .stderr(Stdio::inherit());
- apply_env(&mut command, &transport.env);
- let mut child = command.spawn()?;
- let stdin = child
- .stdin
- .take()
- .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
- let stdout = child
- .stdout
- .take()
- .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
- Ok(Self {
- child,
- stdin,
- stdout: BufReader::new(stdout),
- })
- }
- pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
- self.stdin.write_all(bytes).await
- }
- pub async fn flush(&mut self) -> io::Result<()> {
- self.stdin.flush().await
- }
- pub async fn write_line(&mut self, line: &str) -> io::Result<()> {
- self.write_all(line.as_bytes()).await?;
- self.write_all(b"\n").await?;
- self.flush().await
- }
- pub async fn read_line(&mut self) -> io::Result<String> {
- let mut line = String::new();
- let bytes_read = self.stdout.read_line(&mut line).await?;
- if bytes_read == 0 {
- return Err(io::Error::new(
- io::ErrorKind::UnexpectedEof,
- "MCP stdio stream closed while reading line",
- ));
- }
- Ok(line)
- }
- pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
- let mut buffer = vec![0_u8; 4096];
- let read = self.stdout.read(&mut buffer).await?;
- buffer.truncate(read);
- Ok(buffer)
- }
- pub async fn write_frame(&mut self, payload: &[u8]) -> io::Result<()> {
- let encoded = encode_frame(payload);
- self.write_all(&encoded).await?;
- self.flush().await
- }
- pub async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
- let mut content_length = None;
- loop {
- let mut line = String::new();
- let bytes_read = self.stdout.read_line(&mut line).await?;
- if bytes_read == 0 {
- return Err(io::Error::new(
- io::ErrorKind::UnexpectedEof,
- "MCP stdio stream closed while reading headers",
- ));
- }
- if line == "\r\n" {
- break;
- }
- let header = line.trim_end_matches(['\r', '\n']);
- if let Some((name, value)) = header.split_once(':') {
- if name.trim().eq_ignore_ascii_case("Content-Length") {
- let parsed = value
- .trim()
- .parse::<usize>()
- .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
- content_length = Some(parsed);
- }
- }
- }
- let content_length = content_length.ok_or_else(|| {
- io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
- })?;
- let mut payload = vec![0_u8; content_length];
- self.stdout.read_exact(&mut payload).await?;
- Ok(payload)
- }
- pub async fn write_jsonrpc_message<T: Serialize>(&mut self, message: &T) -> io::Result<()> {
- let body = serde_json::to_vec(message)
- .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
- self.write_frame(&body).await
- }
- pub async fn read_jsonrpc_message<T: DeserializeOwned>(&mut self) -> io::Result<T> {
- let payload = self.read_frame().await?;
- serde_json::from_slice(&payload)
- .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
- }
- pub async fn send_request<T: Serialize>(
- &mut self,
- request: &JsonRpcRequest<T>,
- ) -> io::Result<()> {
- self.write_jsonrpc_message(request).await
- }
- pub async fn read_response<T: DeserializeOwned>(&mut self) -> io::Result<JsonRpcResponse<T>> {
- self.read_jsonrpc_message().await
- }
- pub async fn request<TParams: Serialize, TResult: DeserializeOwned>(
- &mut self,
- id: JsonRpcId,
- method: impl Into<String>,
- params: Option<TParams>,
- ) -> io::Result<JsonRpcResponse<TResult>> {
- let method = method.into();
- let request = JsonRpcRequest::new(id.clone(), method.clone(), params);
- self.send_request(&request).await?;
- let response = self.read_response().await?;
- if response.jsonrpc != "2.0" {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!(
- "MCP response for {method} used unsupported jsonrpc version `{}`",
- response.jsonrpc
- ),
- ));
- }
- if response.id != id {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!(
- "MCP response for {method} used mismatched id: expected {id:?}, got {:?}",
- response.id
- ),
- ));
- }
- Ok(response)
- }
- pub async fn initialize(
- &mut self,
- id: JsonRpcId,
- params: McpInitializeParams,
- ) -> io::Result<JsonRpcResponse<McpInitializeResult>> {
- self.request(id, "initialize", Some(params)).await
- }
- pub async fn list_tools(
- &mut self,
- id: JsonRpcId,
- params: Option<McpListToolsParams>,
- ) -> io::Result<JsonRpcResponse<McpListToolsResult>> {
- self.request(id, "tools/list", params).await
- }
- pub async fn call_tool(
- &mut self,
- id: JsonRpcId,
- params: McpToolCallParams,
- ) -> io::Result<JsonRpcResponse<McpToolCallResult>> {
- self.request(id, "tools/call", Some(params)).await
- }
- pub async fn list_resources(
- &mut self,
- id: JsonRpcId,
- params: Option<McpListResourcesParams>,
- ) -> io::Result<JsonRpcResponse<McpListResourcesResult>> {
- self.request(id, "resources/list", params).await
- }
- pub async fn read_resource(
- &mut self,
- id: JsonRpcId,
- params: McpReadResourceParams,
- ) -> io::Result<JsonRpcResponse<McpReadResourceResult>> {
- self.request(id, "resources/read", Some(params)).await
- }
- pub async fn terminate(&mut self) -> io::Result<()> {
- self.child.kill().await
- }
- pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
- self.child.wait().await
- }
- pub fn has_exited(&mut self) -> io::Result<bool> {
- Ok(self.child.try_wait()?.is_some())
- }
- async fn shutdown(&mut self) -> io::Result<()> {
- if self.child.try_wait()?.is_none() {
- match self.child.kill().await {
- Ok(()) => {}
- Err(error) if error.kind() == io::ErrorKind::InvalidInput => {}
- Err(error) => return Err(error),
- }
- }
- let _ = self.child.wait().await?;
- Ok(())
- }
- }
- pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
- match &bootstrap.transport {
- McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
- other => Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- format!(
- "MCP bootstrap transport for {} is not stdio: {other:?}",
- bootstrap.server_name
- ),
- )),
- }
- }
- fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
- for (key, value) in env {
- command.env(key, value);
- }
- }
- fn encode_frame(payload: &[u8]) -> Vec<u8> {
- let header = format!("Content-Length: {}\r\n\r\n", payload.len());
- let mut framed = header.into_bytes();
- framed.extend_from_slice(payload);
- framed
- }
- fn default_initialize_params() -> McpInitializeParams {
- McpInitializeParams {
- protocol_version: "2025-03-26".to_string(),
- capabilities: JsonValue::Object(serde_json::Map::new()),
- client_info: McpInitializeClientInfo {
- name: "runtime".to_string(),
- version: env!("CARGO_PKG_VERSION").to_string(),
- },
- }
- }
- #[cfg(test)]
- mod tests {
- use std::collections::BTreeMap;
- use std::fs;
- use std::io::ErrorKind;
- use std::os::unix::fs::PermissionsExt;
- use std::path::{Path, PathBuf};
- use std::sync::atomic::{AtomicU64, Ordering};
- use std::time::{SystemTime, UNIX_EPOCH};
- use serde_json::json;
- use tokio::runtime::Builder;
- use crate::config::{
- ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig,
- McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig,
- };
- use crate::mcp::mcp_tool_name;
- use crate::mcp_client::McpClientBootstrap;
- use super::{
- spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
- McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
- McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
- McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
- };
- fn temp_dir() -> PathBuf {
- static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0);
- let nanos = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("time should be after epoch")
- .as_nanos();
- let unique_id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed);
- std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}-{unique_id}"))
- }
- fn write_echo_script() -> PathBuf {
- let root = temp_dir();
- fs::create_dir_all(&root).expect("temp dir");
- let script_path = root.join("echo-mcp.sh");
- fs::write(
- &script_path,
- "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
- )
- .expect("write script");
- let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
- permissions.set_mode(0o755);
- fs::set_permissions(&script_path, permissions).expect("chmod");
- script_path
- }
- fn write_jsonrpc_script() -> PathBuf {
- let root = temp_dir();
- fs::create_dir_all(&root).expect("temp dir");
- let script_path = root.join("jsonrpc-mcp.py");
- let script = [
- "#!/usr/bin/env python3",
- "import json, os, sys",
- "LOWERCASE_CONTENT_LENGTH = os.environ.get('MCP_LOWERCASE_CONTENT_LENGTH') == '1'",
- "MISMATCHED_RESPONSE_ID = os.environ.get('MCP_MISMATCHED_RESPONSE_ID') == '1'",
- "header = b''",
- r"while not header.endswith(b'\r\n\r\n'):",
- " chunk = sys.stdin.buffer.read(1)",
- " if not chunk:",
- " raise SystemExit(1)",
- " header += chunk",
- "length = 0",
- r"for line in header.decode().split('\r\n'):",
- r" if line.lower().startswith('content-length:'):",
- r" length = int(line.split(':', 1)[1].strip())",
- "payload = sys.stdin.buffer.read(length)",
- "request = json.loads(payload.decode())",
- r"assert request['jsonrpc'] == '2.0'",
- r"assert request['method'] == 'initialize'",
- "response_id = 'wrong-id' if MISMATCHED_RESPONSE_ID else request['id']",
- "header_name = 'content-length' if LOWERCASE_CONTENT_LENGTH else 'Content-Length'",
- r"response = json.dumps({",
- r" 'jsonrpc': '2.0',",
- r" 'id': response_id,",
- r" 'result': {",
- r" 'protocolVersion': request['params']['protocolVersion'],",
- r" 'capabilities': {'tools': {}},",
- r" 'serverInfo': {'name': 'fake-mcp', 'version': '0.1.0'}",
- r" }",
- r"}).encode()",
- r"sys.stdout.buffer.write(f'{header_name}: {len(response)}\r\n\r\n'.encode() + response)",
- "sys.stdout.buffer.flush()",
- "",
- ]
- .join("\n");
- fs::write(&script_path, script).expect("write script");
- let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
- permissions.set_mode(0o755);
- fs::set_permissions(&script_path, permissions).expect("chmod");
- script_path
- }
- #[allow(clippy::too_many_lines)]
- fn write_mcp_server_script() -> PathBuf {
- let root = temp_dir();
- fs::create_dir_all(&root).expect("temp dir");
- let script_path = root.join("fake-mcp-server.py");
- let script = [
- "#!/usr/bin/env python3",
- "import json, os, sys, time",
- "TOOL_CALL_DELAY_MS = int(os.environ.get('MCP_TOOL_CALL_DELAY_MS', '0'))",
- "INVALID_TOOL_CALL_RESPONSE = os.environ.get('MCP_INVALID_TOOL_CALL_RESPONSE') == '1'",
- "",
- "def read_message():",
- " header = b''",
- r" while not header.endswith(b'\r\n\r\n'):",
- " chunk = sys.stdin.buffer.read(1)",
- " if not chunk:",
- " return None",
- " header += chunk",
- " length = 0",
- r" for line in header.decode().split('\r\n'):",
- r" if line.lower().startswith('content-length:'):",
- r" length = int(line.split(':', 1)[1].strip())",
- " payload = sys.stdin.buffer.read(length)",
- " return json.loads(payload.decode())",
- "",
- "def send_message(message):",
- " payload = json.dumps(message).encode()",
- r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
- " sys.stdout.buffer.flush()",
- "",
- "while True:",
- " request = read_message()",
- " if request is None:",
- " break",
- " method = request['method']",
- " if method == 'initialize':",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'protocolVersion': request['params']['protocolVersion'],",
- " 'capabilities': {'tools': {}, 'resources': {}},",
- " 'serverInfo': {'name': 'fake-mcp', 'version': '0.2.0'}",
- " }",
- " })",
- " elif method == 'tools/list':",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'tools': [",
- " {",
- " 'name': 'echo',",
- " 'description': 'Echoes text',",
- " 'inputSchema': {",
- " 'type': 'object',",
- " 'properties': {'text': {'type': 'string'}},",
- " 'required': ['text']",
- " }",
- " }",
- " ]",
- " }",
- " })",
- " elif method == 'tools/call':",
- " if INVALID_TOOL_CALL_RESPONSE:",
- " sys.stdout.buffer.write(b'Content-Length: 5\\r\\n\\r\\nnope!')",
- " sys.stdout.buffer.flush()",
- " continue",
- " if TOOL_CALL_DELAY_MS:",
- " time.sleep(TOOL_CALL_DELAY_MS / 1000)",
- " args = request['params'].get('arguments') or {}",
- " if request['params']['name'] == 'fail':",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'error': {'code': -32001, 'message': 'tool failed'},",
- " })",
- " else:",
- " text = args.get('text', '')",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'content': [{'type': 'text', 'text': f'echo:{text}'}],",
- " 'structuredContent': {'echoed': text},",
- " 'isError': False",
- " }",
- " })",
- " elif method == 'resources/list':",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'resources': [",
- " {",
- " 'uri': 'file://guide.txt',",
- " 'name': 'guide',",
- " 'description': 'Guide text',",
- " 'mimeType': 'text/plain'",
- " }",
- " ]",
- " }",
- " })",
- " elif method == 'resources/read':",
- " uri = request['params']['uri']",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'contents': [",
- " {",
- " 'uri': uri,",
- " 'mimeType': 'text/plain',",
- " 'text': f'contents for {uri}'",
- " }",
- " ]",
- " }",
- " })",
- " else:",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
- " })",
- "",
- ]
- .join("\n");
- fs::write(&script_path, script).expect("write script");
- let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
- permissions.set_mode(0o755);
- fs::set_permissions(&script_path, permissions).expect("chmod");
- script_path
- }
- #[allow(clippy::too_many_lines)]
- fn write_manager_mcp_server_script() -> PathBuf {
- let root = temp_dir();
- fs::create_dir_all(&root).expect("temp dir");
- let script_path = root.join("manager-mcp-server.py");
- let script = [
- "#!/usr/bin/env python3",
- "import json, os, sys, time",
- "",
- "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')",
- "LOG_PATH = os.environ.get('MCP_LOG_PATH')",
- "EXIT_AFTER_TOOLS_LIST = os.environ.get('MCP_EXIT_AFTER_TOOLS_LIST') == '1'",
- "FAIL_ONCE_MODE = os.environ.get('MCP_FAIL_ONCE_MODE')",
- "FAIL_ONCE_MARKER = os.environ.get('MCP_FAIL_ONCE_MARKER')",
- "initialize_count = 0",
- "",
- "def log(method):",
- " if LOG_PATH:",
- " with open(LOG_PATH, 'a', encoding='utf-8') as handle:",
- " handle.write(f'{method}\\n')",
- "",
- "def should_fail_once():",
- " if not FAIL_ONCE_MODE or not FAIL_ONCE_MARKER:",
- " return False",
- " if os.path.exists(FAIL_ONCE_MARKER):",
- " return False",
- " with open(FAIL_ONCE_MARKER, 'w', encoding='utf-8') as handle:",
- " handle.write(FAIL_ONCE_MODE)",
- " return True",
- "",
- "def read_message():",
- " header = b''",
- r" while not header.endswith(b'\r\n\r\n'):",
- " chunk = sys.stdin.buffer.read(1)",
- " if not chunk:",
- " return None",
- " header += chunk",
- " length = 0",
- r" for line in header.decode().split('\r\n'):",
- r" if line.lower().startswith('content-length:'):",
- r" length = int(line.split(':', 1)[1].strip())",
- " payload = sys.stdin.buffer.read(length)",
- " return json.loads(payload.decode())",
- "",
- "def send_message(message):",
- " payload = json.dumps(message).encode()",
- r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
- " sys.stdout.buffer.flush()",
- "",
- "while True:",
- " request = read_message()",
- " if request is None:",
- " break",
- " method = request['method']",
- " log(method)",
- " if method == 'initialize':",
- " if FAIL_ONCE_MODE == 'initialize_hang' and should_fail_once():",
- " log('initialize-hang')",
- " while True:",
- " time.sleep(1)",
- " initialize_count += 1",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'protocolVersion': request['params']['protocolVersion'],",
- " 'capabilities': {'tools': {}},",
- " 'serverInfo': {'name': LABEL, 'version': '1.0.0'}",
- " }",
- " })",
- " elif method == 'tools/list':",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'tools': [",
- " {",
- " 'name': 'echo',",
- " 'description': f'Echo tool for {LABEL}',",
- " 'inputSchema': {",
- " 'type': 'object',",
- " 'properties': {'text': {'type': 'string'}},",
- " 'required': ['text']",
- " }",
- " }",
- " ]",
- " }",
- " })",
- " if EXIT_AFTER_TOOLS_LIST:",
- " raise SystemExit(0)",
- " elif method == 'tools/call':",
- " if FAIL_ONCE_MODE == 'tool_call_disconnect' and should_fail_once():",
- " log('tools/call-disconnect')",
- " raise SystemExit(0)",
- " args = request['params'].get('arguments') or {}",
- " text = args.get('text', '')",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'result': {",
- " 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],",
- " 'structuredContent': {",
- " 'server': LABEL,",
- " 'echoed': text,",
- " 'initializeCount': initialize_count",
- " },",
- " 'isError': False",
- " }",
- " })",
- " else:",
- " send_message({",
- " 'jsonrpc': '2.0',",
- " 'id': request['id'],",
- " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
- " })",
- "",
- ]
- .join("\n");
- fs::write(&script_path, script).expect("write script");
- let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
- permissions.set_mode(0o755);
- fs::set_permissions(&script_path, permissions).expect("chmod");
- script_path
- }
- fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
- let config = ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Stdio(McpStdioServerConfig {
- command: "/bin/sh".to_string(),
- args: vec![script_path.to_string_lossy().into_owned()],
- env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
- tool_call_timeout_ms: None,
- }),
- };
- McpClientBootstrap::from_scoped_config("stdio server", &config)
- }
- fn script_transport(script_path: &Path) -> crate::mcp_client::McpStdioTransport {
- script_transport_with_env(script_path, BTreeMap::new())
- }
- fn script_transport_with_env(
- script_path: &Path,
- env: BTreeMap<String, String>,
- ) -> crate::mcp_client::McpStdioTransport {
- crate::mcp_client::McpStdioTransport {
- command: "python3".to_string(),
- args: vec![script_path.to_string_lossy().into_owned()],
- env,
- tool_call_timeout_ms: None,
- }
- }
- fn cleanup_script(script_path: &Path) {
- if let Err(error) = fs::remove_file(script_path) {
- assert_eq!(
- error.kind(),
- std::io::ErrorKind::NotFound,
- "cleanup script: {error}"
- );
- }
- if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) {
- assert_eq!(
- error.kind(),
- std::io::ErrorKind::NotFound,
- "cleanup dir: {error}"
- );
- }
- }
- fn manager_server_config(
- script_path: &Path,
- label: &str,
- log_path: &Path,
- ) -> ScopedMcpServerConfig {
- manager_server_config_with_env(script_path, label, log_path, BTreeMap::new())
- }
- fn manager_server_config_with_env(
- script_path: &Path,
- label: &str,
- log_path: &Path,
- extra_env: BTreeMap<String, String>,
- ) -> ScopedMcpServerConfig {
- let mut env = BTreeMap::from([
- ("MCP_SERVER_LABEL".to_string(), label.to_string()),
- (
- "MCP_LOG_PATH".to_string(),
- log_path.to_string_lossy().into_owned(),
- ),
- ]);
- env.extend(extra_env);
- ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Stdio(McpStdioServerConfig {
- command: "python3".to_string(),
- args: vec![script_path.to_string_lossy().into_owned()],
- env,
- tool_call_timeout_ms: None,
- }),
- }
- }
- #[test]
- fn spawns_stdio_process_and_round_trips_io() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_echo_script();
- let bootstrap = sample_bootstrap(&script_path);
- let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
- let ready = process.read_line().await.expect("read ready");
- assert_eq!(ready, "READY:secret-value\n");
- process
- .write_line("ping from client")
- .await
- .expect("write line");
- let echoed = process.read_line().await.expect("read echo");
- assert_eq!(echoed, "ECHO:ping from client\n");
- let status = process.wait().await.expect("wait for exit");
- assert!(status.success());
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn rejects_non_stdio_bootstrap() {
- let config = ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
- name: "sdk-server".to_string(),
- }),
- };
- let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
- let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
- assert_eq!(error.kind(), ErrorKind::InvalidInput);
- }
- #[test]
- fn round_trips_initialize_request_and_response_over_stdio_frames() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_jsonrpc_script();
- let transport = script_transport(&script_path);
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
- let response = process
- .initialize(
- JsonRpcId::Number(1),
- McpInitializeParams {
- protocol_version: "2025-03-26".to_string(),
- capabilities: json!({"roots": {}}),
- client_info: McpInitializeClientInfo {
- name: "runtime-tests".to_string(),
- version: "0.1.0".to_string(),
- },
- },
- )
- .await
- .expect("initialize roundtrip");
- assert_eq!(response.id, JsonRpcId::Number(1));
- assert_eq!(response.error, None);
- assert_eq!(
- response.result,
- Some(McpInitializeResult {
- protocol_version: "2025-03-26".to_string(),
- capabilities: json!({"tools": {}}),
- server_info: McpInitializeServerInfo {
- name: "fake-mcp".to_string(),
- version: "0.1.0".to_string(),
- },
- })
- );
- let status = process.wait().await.expect("wait for exit");
- assert!(status.success());
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn write_jsonrpc_request_emits_content_length_frame() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_jsonrpc_script();
- let transport = script_transport(&script_path);
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
- let request = JsonRpcRequest::new(
- JsonRpcId::Number(7),
- "initialize",
- Some(json!({
- "protocolVersion": "2025-03-26",
- "capabilities": {},
- "clientInfo": {"name": "runtime-tests", "version": "0.1.0"}
- })),
- );
- process.send_request(&request).await.expect("send request");
- let response: JsonRpcResponse<serde_json::Value> =
- process.read_response().await.expect("read response");
- assert_eq!(response.id, JsonRpcId::Number(7));
- assert_eq!(response.jsonrpc, "2.0");
- let status = process.wait().await.expect("wait for exit");
- assert!(status.success());
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn given_lowercase_content_length_when_initialize_then_response_parses() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_jsonrpc_script();
- let transport = script_transport_with_env(
- &script_path,
- BTreeMap::from([(
- "MCP_LOWERCASE_CONTENT_LENGTH".to_string(),
- "1".to_string(),
- )]),
- );
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
- let response = process
- .initialize(
- JsonRpcId::Number(8),
- McpInitializeParams {
- protocol_version: "2025-03-26".to_string(),
- capabilities: json!({"roots": {}}),
- client_info: McpInitializeClientInfo {
- name: "runtime-tests".to_string(),
- version: "0.1.0".to_string(),
- },
- },
- )
- .await
- .expect("initialize roundtrip");
- assert_eq!(response.id, JsonRpcId::Number(8));
- assert_eq!(response.error, None);
- assert!(response.result.is_some());
- let status = process.wait().await.expect("wait for exit");
- assert!(status.success());
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn given_mismatched_response_id_when_initialize_then_invalid_data_is_returned() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_jsonrpc_script();
- let transport = script_transport_with_env(
- &script_path,
- BTreeMap::from([(
- "MCP_MISMATCHED_RESPONSE_ID".to_string(),
- "1".to_string(),
- )]),
- );
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
- let error = process
- .initialize(
- JsonRpcId::Number(9),
- McpInitializeParams {
- protocol_version: "2025-03-26".to_string(),
- capabilities: json!({"roots": {}}),
- client_info: McpInitializeClientInfo {
- name: "runtime-tests".to_string(),
- version: "0.1.0".to_string(),
- },
- },
- )
- .await
- .expect_err("mismatched response id should fail");
- assert_eq!(error.kind(), ErrorKind::InvalidData);
- assert!(error.to_string().contains("mismatched id"));
- let status = process.wait().await.expect("wait for exit");
- assert!(status.success());
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn direct_spawn_uses_transport_env() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_echo_script();
- let transport = crate::mcp_client::McpStdioTransport {
- command: "/bin/sh".to_string(),
- args: vec![script_path.to_string_lossy().into_owned()],
- env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
- tool_call_timeout_ms: None,
- };
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
- let ready = process.read_available().await.expect("read ready");
- assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
- process.terminate().await.expect("terminate child");
- let _ = process.wait().await.expect("wait after kill");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn lists_tools_calls_tool_and_reads_resources_over_jsonrpc() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_mcp_server_script();
- let transport = script_transport(&script_path);
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
- let tools = process
- .list_tools(JsonRpcId::Number(2), None)
- .await
- .expect("list tools");
- assert_eq!(tools.error, None);
- assert_eq!(tools.id, JsonRpcId::Number(2));
- assert_eq!(
- tools.result,
- Some(McpListToolsResult {
- tools: vec![McpTool {
- name: "echo".to_string(),
- description: Some("Echoes text".to_string()),
- input_schema: Some(json!({
- "type": "object",
- "properties": {"text": {"type": "string"}},
- "required": ["text"]
- })),
- annotations: None,
- meta: None,
- }],
- next_cursor: None,
- })
- );
- let call = process
- .call_tool(
- JsonRpcId::String("call-1".to_string()),
- McpToolCallParams {
- name: "echo".to_string(),
- arguments: Some(json!({"text": "hello"})),
- meta: None,
- },
- )
- .await
- .expect("call tool");
- assert_eq!(call.error, None);
- let call_result = call.result.expect("tool result");
- assert_eq!(call_result.is_error, Some(false));
- assert_eq!(
- call_result.structured_content,
- Some(json!({"echoed": "hello"}))
- );
- assert_eq!(call_result.content.len(), 1);
- assert_eq!(call_result.content[0].kind, "text");
- assert_eq!(
- call_result.content[0].data.get("text"),
- Some(&json!("echo:hello"))
- );
- let resources = process
- .list_resources(JsonRpcId::Number(3), None)
- .await
- .expect("list resources");
- let resources_result = resources.result.expect("resources result");
- assert_eq!(resources_result.resources.len(), 1);
- assert_eq!(resources_result.resources[0].uri, "file://guide.txt");
- assert_eq!(
- resources_result.resources[0].mime_type.as_deref(),
- Some("text/plain")
- );
- let read = process
- .read_resource(
- JsonRpcId::Number(4),
- McpReadResourceParams {
- uri: "file://guide.txt".to_string(),
- },
- )
- .await
- .expect("read resource");
- assert_eq!(
- read.result,
- Some(McpReadResourceResult {
- contents: vec![super::McpResourceContents {
- uri: "file://guide.txt".to_string(),
- mime_type: Some("text/plain".to_string()),
- text: Some("contents for file://guide.txt".to_string()),
- blob: None,
- meta: None,
- }],
- })
- );
- process.terminate().await.expect("terminate child");
- let _ = process.wait().await.expect("wait after kill");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn surfaces_jsonrpc_errors_from_tool_calls() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_mcp_server_script();
- let transport = script_transport(&script_path);
- let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
- let response = process
- .call_tool(
- JsonRpcId::Number(9),
- McpToolCallParams {
- name: "fail".to_string(),
- arguments: None,
- meta: None,
- },
- )
- .await
- .expect("call tool with error response");
- assert_eq!(response.id, JsonRpcId::Number(9));
- assert!(response.result.is_none());
- assert_eq!(response.error.as_ref().map(|e| e.code), Some(-32001));
- assert_eq!(
- response.error.as_ref().map(|e| e.message.as_str()),
- Some("tool failed")
- );
- process.terminate().await.expect("terminate child");
- let _ = process.wait().await.expect("wait after kill");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn manager_discovers_tools_from_stdio_config() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("alpha.log");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config(&script_path, "alpha", &log_path),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- let tools = manager.discover_tools().await.expect("discover tools");
- assert_eq!(tools.len(), 1);
- assert_eq!(tools[0].server_name, "alpha");
- assert_eq!(tools[0].raw_name, "echo");
- assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
- assert_eq!(tools[0].tool.name, "echo");
- assert!(manager.unsupported_servers().is_empty());
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn manager_routes_tool_calls_to_correct_server() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let alpha_log = root.join("alpha.log");
- let beta_log = root.join("beta.log");
- let servers = BTreeMap::from([
- (
- "alpha".to_string(),
- manager_server_config(&script_path, "alpha", &alpha_log),
- ),
- (
- "beta".to_string(),
- manager_server_config(&script_path, "beta", &beta_log),
- ),
- ]);
- let mut manager = McpServerManager::from_servers(&servers);
- let tools = manager.discover_tools().await.expect("discover tools");
- assert_eq!(tools.len(), 2);
- let alpha = manager
- .call_tool(
- &mcp_tool_name("alpha", "echo"),
- Some(json!({"text": "hello"})),
- )
- .await
- .expect("call alpha tool");
- let beta = manager
- .call_tool(
- &mcp_tool_name("beta", "echo"),
- Some(json!({"text": "world"})),
- )
- .await
- .expect("call beta tool");
- assert_eq!(
- alpha
- .result
- .as_ref()
- .and_then(|result| result.structured_content.as_ref())
- .and_then(|value| value.get("server")),
- Some(&json!("alpha"))
- );
- assert_eq!(
- beta.result
- .as_ref()
- .and_then(|result| result.structured_content.as_ref())
- .and_then(|value| value.get("server")),
- Some(&json!("beta"))
- );
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn manager_times_out_slow_tool_calls() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("timeout.log");
- let servers = BTreeMap::from([(
- "slow".to_string(),
- ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Stdio(McpStdioServerConfig {
- command: "python3".to_string(),
- args: vec![script_path.to_string_lossy().into_owned()],
- env: BTreeMap::from([(
- "MCP_TOOL_CALL_DELAY_MS".to_string(),
- "200".to_string(),
- )]),
- tool_call_timeout_ms: Some(25),
- }),
- },
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- manager.discover_tools().await.expect("discover tools");
- let error = manager
- .call_tool(&mcp_tool_name("slow", "echo"), Some(json!({"text": "slow"})))
- .await
- .expect_err("slow tool call should time out");
- match error {
- McpServerManagerError::Timeout {
- server_name,
- method,
- timeout_ms,
- } => {
- assert_eq!(server_name, "slow");
- assert_eq!(method, "tools/call");
- assert_eq!(timeout_ms, 25);
- }
- other => panic!("expected timeout error, got {other:?}"),
- }
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- let _ = fs::remove_file(log_path);
- });
- }
- #[test]
- fn manager_surfaces_parse_errors_from_tool_calls() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_mcp_server_script();
- let servers = BTreeMap::from([(
- "broken".to_string(),
- ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Stdio(McpStdioServerConfig {
- command: "python3".to_string(),
- args: vec![script_path.to_string_lossy().into_owned()],
- env: BTreeMap::from([(
- "MCP_INVALID_TOOL_CALL_RESPONSE".to_string(),
- "1".to_string(),
- )]),
- tool_call_timeout_ms: Some(1_000),
- }),
- },
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- manager.discover_tools().await.expect("discover tools");
- let error = manager
- .call_tool(
- &mcp_tool_name("broken", "echo"),
- Some(json!({"text": "invalid-json"})),
- )
- .await
- .expect_err("invalid json should fail");
- match error {
- McpServerManagerError::InvalidResponse {
- server_name,
- method,
- details,
- } => {
- assert_eq!(server_name, "broken");
- assert_eq!(method, "tools/call");
- assert!(details.contains("expected ident") || details.contains("expected value"));
- }
- other => panic!("expected invalid response error, got {other:?}"),
- }
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn given_child_exits_after_discovery_when_calling_twice_then_second_call_succeeds_after_reset() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("dropping.log");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config_with_env(
- &script_path,
- "alpha",
- &log_path,
- BTreeMap::from([(
- "MCP_EXIT_AFTER_TOOLS_LIST".to_string(),
- "1".to_string(),
- )]),
- ),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- manager.discover_tools().await.expect("discover tools");
- let first_error = manager
- .call_tool(
- &mcp_tool_name("alpha", "echo"),
- Some(json!({"text": "reconnect"})),
- )
- .await
- .expect_err("first call should fail after transport drops");
- match first_error {
- McpServerManagerError::Transport {
- server_name,
- method,
- source,
- } => {
- assert_eq!(server_name, "alpha");
- assert_eq!(method, "tools/call");
- assert_eq!(source.kind(), ErrorKind::UnexpectedEof);
- }
- other => panic!("expected transport error, got {other:?}"),
- }
- let response = manager
- .call_tool(
- &mcp_tool_name("alpha", "echo"),
- Some(json!({"text": "reconnect"})),
- )
- .await
- .expect("second tool call should succeed after reset");
- assert_eq!(
- response
- .result
- .as_ref()
- .and_then(|result| result.structured_content.as_ref())
- .and_then(|value| value.get("server")),
- Some(&json!("alpha"))
- );
- let log = fs::read_to_string(&log_path).expect("read log");
- assert_eq!(
- log.lines().collect::<Vec<_>>(),
- vec!["initialize", "tools/list", "initialize", "tools/call"]
- );
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn given_initialize_hangs_once_when_discover_tools_then_manager_retries_and_succeeds() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("initialize-hang.log");
- let marker_path = root.join("initialize-hang.marker");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config_with_env(
- &script_path,
- "alpha",
- &log_path,
- BTreeMap::from([
- (
- "MCP_FAIL_ONCE_MODE".to_string(),
- "initialize_hang".to_string(),
- ),
- (
- "MCP_FAIL_ONCE_MARKER".to_string(),
- marker_path.to_string_lossy().into_owned(),
- ),
- ]),
- ),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- let tools = manager.discover_tools().await.expect("discover tools after retry");
- assert_eq!(tools.len(), 1);
- assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
- let log = fs::read_to_string(&log_path).expect("read log");
- assert_eq!(
- log.lines().collect::<Vec<_>>(),
- vec!["initialize", "initialize-hang", "initialize", "tools/list"]
- );
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn given_tool_call_disconnects_once_when_calling_twice_then_manager_resets_and_next_call_succeeds() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("tool-call-disconnect.log");
- let marker_path = root.join("tool-call-disconnect.marker");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config_with_env(
- &script_path,
- "alpha",
- &log_path,
- BTreeMap::from([
- (
- "MCP_FAIL_ONCE_MODE".to_string(),
- "tool_call_disconnect".to_string(),
- ),
- (
- "MCP_FAIL_ONCE_MARKER".to_string(),
- marker_path.to_string_lossy().into_owned(),
- ),
- ]),
- ),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- manager.discover_tools().await.expect("discover tools");
- let first_error = manager
- .call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "first"})))
- .await
- .expect_err("first tool call should fail when transport drops");
- match first_error {
- McpServerManagerError::Transport {
- server_name,
- method,
- source,
- } => {
- assert_eq!(server_name, "alpha");
- assert_eq!(method, "tools/call");
- assert_eq!(source.kind(), ErrorKind::UnexpectedEof);
- }
- other => panic!("expected transport error, got {other:?}"),
- }
- let response = manager
- .call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "second"})))
- .await
- .expect("second tool call should succeed after reset");
- assert_eq!(
- response
- .result
- .as_ref()
- .and_then(|result| result.structured_content.as_ref())
- .and_then(|value| value.get("echoed")),
- Some(&json!("second"))
- );
- let log = fs::read_to_string(&log_path).expect("read log");
- assert_eq!(
- log.lines().collect::<Vec<_>>(),
- vec![
- "initialize",
- "tools/list",
- "tools/call",
- "tools/call-disconnect",
- "initialize",
- "tools/call",
- ]
- );
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn manager_records_unsupported_non_stdio_servers_without_panicking() {
- let servers = BTreeMap::from([
- (
- "http".to_string(),
- ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Http(McpRemoteServerConfig {
- url: "https://example.test/mcp".to_string(),
- headers: BTreeMap::new(),
- headers_helper: None,
- oauth: None,
- }),
- },
- ),
- (
- "sdk".to_string(),
- ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Sdk(McpSdkServerConfig {
- name: "sdk-server".to_string(),
- }),
- },
- ),
- (
- "ws".to_string(),
- ScopedMcpServerConfig {
- scope: ConfigSource::Local,
- config: McpServerConfig::Ws(McpWebSocketServerConfig {
- url: "wss://example.test/mcp".to_string(),
- headers: BTreeMap::new(),
- headers_helper: None,
- }),
- },
- ),
- ]);
- let manager = McpServerManager::from_servers(&servers);
- let unsupported = manager.unsupported_servers();
- assert_eq!(unsupported.len(), 3);
- assert_eq!(unsupported[0].server_name, "http");
- assert_eq!(unsupported[1].server_name, "sdk");
- assert_eq!(unsupported[2].server_name, "ws");
- }
- #[test]
- fn manager_shutdown_terminates_spawned_children_and_is_idempotent() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("alpha.log");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config(&script_path, "alpha", &log_path),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- manager.discover_tools().await.expect("discover tools");
- manager.shutdown().await.expect("first shutdown");
- manager.shutdown().await.expect("second shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn manager_reuses_spawned_server_between_discovery_and_call() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("alpha.log");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config(&script_path, "alpha", &log_path),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- manager.discover_tools().await.expect("discover tools");
- let response = manager
- .call_tool(
- &mcp_tool_name("alpha", "echo"),
- Some(json!({"text": "reuse"})),
- )
- .await
- .expect("call tool");
- assert_eq!(
- response
- .result
- .as_ref()
- .and_then(|result| result.structured_content.as_ref())
- .and_then(|value| value.get("initializeCount")),
- Some(&json!(1))
- );
- let log = fs::read_to_string(&log_path).expect("read log");
- assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1);
- assert_eq!(
- log.lines().collect::<Vec<_>>(),
- vec!["initialize", "tools/list", "tools/call"]
- );
- manager.shutdown().await.expect("shutdown");
- cleanup_script(&script_path);
- });
- }
- #[test]
- fn manager_reports_unknown_qualified_tool_name() {
- let runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("runtime");
- runtime.block_on(async {
- let script_path = write_manager_mcp_server_script();
- let root = script_path.parent().expect("script parent");
- let log_path = root.join("alpha.log");
- let servers = BTreeMap::from([(
- "alpha".to_string(),
- manager_server_config(&script_path, "alpha", &log_path),
- )]);
- let mut manager = McpServerManager::from_servers(&servers);
- let error = manager
- .call_tool(
- &mcp_tool_name("alpha", "missing"),
- Some(json!({"text": "nope"})),
- )
- .await
- .expect_err("unknown qualified tool should fail");
- match error {
- McpServerManagerError::UnknownTool { qualified_name } => {
- assert_eq!(qualified_name, mcp_tool_name("alpha", "missing"));
- }
- other => panic!("expected unknown tool error, got {other:?}"),
- }
- cleanup_script(&script_path);
- });
- }
- }
|