mcp_stdio.rs 86 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399
  1. use std::collections::BTreeMap;
  2. use std::future::Future;
  3. use std::io;
  4. use std::process::Stdio;
  5. use std::time::Duration;
  6. use serde::de::DeserializeOwned;
  7. use serde::{Deserialize, Serialize};
  8. use serde_json::Value as JsonValue;
  9. use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
  10. use tokio::process::{Child, ChildStdin, ChildStdout, Command};
  11. use tokio::time::timeout;
  12. use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
  13. use crate::mcp::mcp_tool_name;
  14. use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
  15. #[cfg(test)]
  16. const MCP_INITIALIZE_TIMEOUT_MS: u64 = 200;
  17. #[cfg(not(test))]
  18. const MCP_INITIALIZE_TIMEOUT_MS: u64 = 10_000;
  19. #[cfg(test)]
  20. const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 300;
  21. #[cfg(not(test))]
  22. const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 30_000;
  23. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  24. #[serde(untagged)]
  25. pub enum JsonRpcId {
  26. Number(u64),
  27. String(String),
  28. Null,
  29. }
  30. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  31. pub struct JsonRpcRequest<T = JsonValue> {
  32. pub jsonrpc: String,
  33. pub id: JsonRpcId,
  34. pub method: String,
  35. #[serde(skip_serializing_if = "Option::is_none")]
  36. pub params: Option<T>,
  37. }
  38. impl<T> JsonRpcRequest<T> {
  39. #[must_use]
  40. pub fn new(id: JsonRpcId, method: impl Into<String>, params: Option<T>) -> Self {
  41. Self {
  42. jsonrpc: "2.0".to_string(),
  43. id,
  44. method: method.into(),
  45. params,
  46. }
  47. }
  48. }
  49. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  50. pub struct JsonRpcError {
  51. pub code: i64,
  52. pub message: String,
  53. #[serde(skip_serializing_if = "Option::is_none")]
  54. pub data: Option<JsonValue>,
  55. }
  56. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  57. pub struct JsonRpcResponse<T = JsonValue> {
  58. pub jsonrpc: String,
  59. pub id: JsonRpcId,
  60. #[serde(skip_serializing_if = "Option::is_none")]
  61. pub result: Option<T>,
  62. #[serde(skip_serializing_if = "Option::is_none")]
  63. pub error: Option<JsonRpcError>,
  64. }
  65. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  66. #[serde(rename_all = "camelCase")]
  67. pub struct McpInitializeParams {
  68. pub protocol_version: String,
  69. pub capabilities: JsonValue,
  70. pub client_info: McpInitializeClientInfo,
  71. }
  72. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  73. #[serde(rename_all = "camelCase")]
  74. pub struct McpInitializeClientInfo {
  75. pub name: String,
  76. pub version: String,
  77. }
  78. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  79. #[serde(rename_all = "camelCase")]
  80. pub struct McpInitializeResult {
  81. pub protocol_version: String,
  82. pub capabilities: JsonValue,
  83. pub server_info: McpInitializeServerInfo,
  84. }
  85. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  86. #[serde(rename_all = "camelCase")]
  87. pub struct McpInitializeServerInfo {
  88. pub name: String,
  89. pub version: String,
  90. }
  91. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  92. #[serde(rename_all = "camelCase")]
  93. pub struct McpListToolsParams {
  94. #[serde(skip_serializing_if = "Option::is_none")]
  95. pub cursor: Option<String>,
  96. }
  97. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  98. pub struct McpTool {
  99. pub name: String,
  100. #[serde(skip_serializing_if = "Option::is_none")]
  101. pub description: Option<String>,
  102. #[serde(rename = "inputSchema", skip_serializing_if = "Option::is_none")]
  103. pub input_schema: Option<JsonValue>,
  104. #[serde(skip_serializing_if = "Option::is_none")]
  105. pub annotations: Option<JsonValue>,
  106. #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
  107. pub meta: Option<JsonValue>,
  108. }
  109. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  110. #[serde(rename_all = "camelCase")]
  111. pub struct McpListToolsResult {
  112. pub tools: Vec<McpTool>,
  113. #[serde(skip_serializing_if = "Option::is_none")]
  114. pub next_cursor: Option<String>,
  115. }
  116. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  117. #[serde(rename_all = "camelCase")]
  118. pub struct McpToolCallParams {
  119. pub name: String,
  120. #[serde(skip_serializing_if = "Option::is_none")]
  121. pub arguments: Option<JsonValue>,
  122. #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
  123. pub meta: Option<JsonValue>,
  124. }
  125. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  126. pub struct McpToolCallContent {
  127. #[serde(rename = "type")]
  128. pub kind: String,
  129. #[serde(flatten)]
  130. pub data: BTreeMap<String, JsonValue>,
  131. }
  132. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  133. #[serde(rename_all = "camelCase")]
  134. pub struct McpToolCallResult {
  135. #[serde(default)]
  136. pub content: Vec<McpToolCallContent>,
  137. #[serde(default)]
  138. pub structured_content: Option<JsonValue>,
  139. #[serde(default)]
  140. pub is_error: Option<bool>,
  141. #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
  142. pub meta: Option<JsonValue>,
  143. }
  144. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  145. #[serde(rename_all = "camelCase")]
  146. pub struct McpListResourcesParams {
  147. #[serde(skip_serializing_if = "Option::is_none")]
  148. pub cursor: Option<String>,
  149. }
  150. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  151. pub struct McpResource {
  152. pub uri: String,
  153. #[serde(skip_serializing_if = "Option::is_none")]
  154. pub name: Option<String>,
  155. #[serde(skip_serializing_if = "Option::is_none")]
  156. pub description: Option<String>,
  157. #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
  158. pub mime_type: Option<String>,
  159. #[serde(skip_serializing_if = "Option::is_none")]
  160. pub annotations: Option<JsonValue>,
  161. #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
  162. pub meta: Option<JsonValue>,
  163. }
  164. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  165. #[serde(rename_all = "camelCase")]
  166. pub struct McpListResourcesResult {
  167. pub resources: Vec<McpResource>,
  168. #[serde(skip_serializing_if = "Option::is_none")]
  169. pub next_cursor: Option<String>,
  170. }
  171. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  172. #[serde(rename_all = "camelCase")]
  173. pub struct McpReadResourceParams {
  174. pub uri: String,
  175. }
  176. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  177. pub struct McpResourceContents {
  178. pub uri: String,
  179. #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
  180. pub mime_type: Option<String>,
  181. #[serde(skip_serializing_if = "Option::is_none")]
  182. pub text: Option<String>,
  183. #[serde(skip_serializing_if = "Option::is_none")]
  184. pub blob: Option<String>,
  185. #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
  186. pub meta: Option<JsonValue>,
  187. }
  188. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
  189. pub struct McpReadResourceResult {
  190. pub contents: Vec<McpResourceContents>,
  191. }
  192. #[derive(Debug, Clone, PartialEq)]
  193. pub struct ManagedMcpTool {
  194. pub server_name: String,
  195. pub qualified_name: String,
  196. pub raw_name: String,
  197. pub tool: McpTool,
  198. }
  199. #[derive(Debug, Clone, PartialEq, Eq)]
  200. pub struct UnsupportedMcpServer {
  201. pub server_name: String,
  202. pub transport: McpTransport,
  203. pub reason: String,
  204. }
  205. #[derive(Debug)]
  206. pub enum McpServerManagerError {
  207. Io(io::Error),
  208. Transport {
  209. server_name: String,
  210. method: &'static str,
  211. source: io::Error,
  212. },
  213. JsonRpc {
  214. server_name: String,
  215. method: &'static str,
  216. error: JsonRpcError,
  217. },
  218. InvalidResponse {
  219. server_name: String,
  220. method: &'static str,
  221. details: String,
  222. },
  223. Timeout {
  224. server_name: String,
  225. method: &'static str,
  226. timeout_ms: u64,
  227. },
  228. UnknownTool {
  229. qualified_name: String,
  230. },
  231. UnknownServer {
  232. server_name: String,
  233. },
  234. }
  235. impl std::fmt::Display for McpServerManagerError {
  236. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  237. match self {
  238. Self::Io(error) => write!(f, "{error}"),
  239. Self::Transport {
  240. server_name,
  241. method,
  242. source,
  243. } => write!(
  244. f,
  245. "MCP server `{server_name}` transport failed during {method}: {source}"
  246. ),
  247. Self::JsonRpc {
  248. server_name,
  249. method,
  250. error,
  251. } => write!(
  252. f,
  253. "MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})",
  254. error.message, error.code
  255. ),
  256. Self::InvalidResponse {
  257. server_name,
  258. method,
  259. details,
  260. } => write!(
  261. f,
  262. "MCP server `{server_name}` returned invalid response for {method}: {details}"
  263. ),
  264. Self::Timeout {
  265. server_name,
  266. method,
  267. timeout_ms,
  268. } => write!(
  269. f,
  270. "MCP server `{server_name}` timed out after {timeout_ms} ms while handling {method}"
  271. ),
  272. Self::UnknownTool { qualified_name } => {
  273. write!(f, "unknown MCP tool `{qualified_name}`")
  274. }
  275. Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"),
  276. }
  277. }
  278. }
  279. impl std::error::Error for McpServerManagerError {
  280. fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
  281. match self {
  282. Self::Io(error) => Some(error),
  283. Self::Transport { source, .. } => Some(source),
  284. Self::JsonRpc { .. }
  285. | Self::InvalidResponse { .. }
  286. | Self::Timeout { .. }
  287. | Self::UnknownTool { .. }
  288. | Self::UnknownServer { .. } => None,
  289. }
  290. }
  291. }
  292. impl From<io::Error> for McpServerManagerError {
  293. fn from(value: io::Error) -> Self {
  294. Self::Io(value)
  295. }
  296. }
  297. #[derive(Debug, Clone, PartialEq, Eq)]
  298. struct ToolRoute {
  299. server_name: String,
  300. raw_name: String,
  301. }
  302. #[derive(Debug)]
  303. struct ManagedMcpServer {
  304. bootstrap: McpClientBootstrap,
  305. process: Option<McpStdioProcess>,
  306. initialized: bool,
  307. }
  308. impl ManagedMcpServer {
  309. fn new(bootstrap: McpClientBootstrap) -> Self {
  310. Self {
  311. bootstrap,
  312. process: None,
  313. initialized: false,
  314. }
  315. }
  316. }
  317. #[derive(Debug)]
  318. pub struct McpServerManager {
  319. servers: BTreeMap<String, ManagedMcpServer>,
  320. unsupported_servers: Vec<UnsupportedMcpServer>,
  321. tool_index: BTreeMap<String, ToolRoute>,
  322. next_request_id: u64,
  323. }
  324. impl McpServerManager {
  325. #[must_use]
  326. pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
  327. Self::from_servers(config.mcp().servers())
  328. }
  329. #[must_use]
  330. pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
  331. let mut managed_servers = BTreeMap::new();
  332. let mut unsupported_servers = Vec::new();
  333. for (server_name, server_config) in servers {
  334. if server_config.transport() == McpTransport::Stdio {
  335. let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config);
  336. managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
  337. } else {
  338. unsupported_servers.push(UnsupportedMcpServer {
  339. server_name: server_name.clone(),
  340. transport: server_config.transport(),
  341. reason: format!(
  342. "transport {:?} is not supported by McpServerManager",
  343. server_config.transport()
  344. ),
  345. });
  346. }
  347. }
  348. Self {
  349. servers: managed_servers,
  350. unsupported_servers,
  351. tool_index: BTreeMap::new(),
  352. next_request_id: 1,
  353. }
  354. }
  355. #[must_use]
  356. pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
  357. &self.unsupported_servers
  358. }
  359. pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
  360. let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
  361. let mut discovered_tools = Vec::new();
  362. for server_name in server_names {
  363. let server_tools = self.discover_tools_for_server(&server_name).await?;
  364. self.clear_routes_for_server(&server_name);
  365. for tool in server_tools {
  366. self.tool_index.insert(
  367. tool.qualified_name.clone(),
  368. ToolRoute {
  369. server_name: tool.server_name.clone(),
  370. raw_name: tool.raw_name.clone(),
  371. },
  372. );
  373. discovered_tools.push(tool);
  374. }
  375. }
  376. Ok(discovered_tools)
  377. }
  378. pub async fn call_tool(
  379. &mut self,
  380. qualified_tool_name: &str,
  381. arguments: Option<JsonValue>,
  382. ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
  383. let route = self
  384. .tool_index
  385. .get(qualified_tool_name)
  386. .cloned()
  387. .ok_or_else(|| McpServerManagerError::UnknownTool {
  388. qualified_name: qualified_tool_name.to_string(),
  389. })?;
  390. let timeout_ms = self.tool_call_timeout_ms(&route.server_name)?;
  391. self.ensure_server_ready(&route.server_name).await?;
  392. let request_id = self.take_request_id();
  393. let response = {
  394. let server = self.server_mut(&route.server_name)?;
  395. let process = server.process.as_mut().ok_or_else(|| {
  396. McpServerManagerError::InvalidResponse {
  397. server_name: route.server_name.clone(),
  398. method: "tools/call",
  399. details: "server process missing after initialization".to_string(),
  400. }
  401. })?;
  402. Self::run_process_request(
  403. &route.server_name,
  404. "tools/call",
  405. timeout_ms,
  406. process.call_tool(
  407. request_id,
  408. McpToolCallParams {
  409. name: route.raw_name,
  410. arguments,
  411. meta: None,
  412. },
  413. ),
  414. )
  415. .await
  416. };
  417. if let Err(error) = &response {
  418. if Self::should_reset_server(error) {
  419. self.reset_server(&route.server_name).await?;
  420. }
  421. }
  422. response
  423. }
  424. pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
  425. let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
  426. for server_name in server_names {
  427. let server = self.server_mut(&server_name)?;
  428. if let Some(process) = server.process.as_mut() {
  429. process.shutdown().await?;
  430. }
  431. server.process = None;
  432. server.initialized = false;
  433. }
  434. Ok(())
  435. }
  436. fn clear_routes_for_server(&mut self, server_name: &str) {
  437. self.tool_index
  438. .retain(|_, route| route.server_name != server_name);
  439. }
  440. fn server_mut(
  441. &mut self,
  442. server_name: &str,
  443. ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
  444. self.servers
  445. .get_mut(server_name)
  446. .ok_or_else(|| McpServerManagerError::UnknownServer {
  447. server_name: server_name.to_string(),
  448. })
  449. }
  450. fn take_request_id(&mut self) -> JsonRpcId {
  451. let id = self.next_request_id;
  452. self.next_request_id = self.next_request_id.saturating_add(1);
  453. JsonRpcId::Number(id)
  454. }
  455. fn tool_call_timeout_ms(&self, server_name: &str) -> Result<u64, McpServerManagerError> {
  456. let server = self
  457. .servers
  458. .get(server_name)
  459. .ok_or_else(|| McpServerManagerError::UnknownServer {
  460. server_name: server_name.to_string(),
  461. })?;
  462. match &server.bootstrap.transport {
  463. McpClientTransport::Stdio(transport) => Ok(transport.resolved_tool_call_timeout_ms()),
  464. other => Err(McpServerManagerError::InvalidResponse {
  465. server_name: server_name.to_string(),
  466. method: "tools/call",
  467. details: format!("unsupported MCP transport for stdio manager: {other:?}"),
  468. }),
  469. }
  470. }
  471. fn server_process_exited(&mut self, server_name: &str) -> Result<bool, McpServerManagerError> {
  472. let server = self.server_mut(server_name)?;
  473. match server.process.as_mut() {
  474. Some(process) => Ok(process.has_exited()?),
  475. None => Ok(false),
  476. }
  477. }
  478. async fn discover_tools_for_server(
  479. &mut self,
  480. server_name: &str,
  481. ) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
  482. let mut attempts = 0;
  483. loop {
  484. match self.discover_tools_for_server_once(server_name).await {
  485. Ok(tools) => return Ok(tools),
  486. Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
  487. self.reset_server(server_name).await?;
  488. attempts += 1;
  489. }
  490. Err(error) => {
  491. if Self::should_reset_server(&error) {
  492. self.reset_server(server_name).await?;
  493. }
  494. return Err(error);
  495. }
  496. }
  497. }
  498. }
  499. async fn discover_tools_for_server_once(
  500. &mut self,
  501. server_name: &str,
  502. ) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
  503. self.ensure_server_ready(server_name).await?;
  504. let mut discovered_tools = Vec::new();
  505. let mut cursor = None;
  506. loop {
  507. let request_id = self.take_request_id();
  508. let response = {
  509. let server = self.server_mut(server_name)?;
  510. let process = server.process.as_mut().ok_or_else(|| {
  511. McpServerManagerError::InvalidResponse {
  512. server_name: server_name.to_string(),
  513. method: "tools/list",
  514. details: "server process missing after initialization".to_string(),
  515. }
  516. })?;
  517. Self::run_process_request(
  518. server_name,
  519. "tools/list",
  520. MCP_LIST_TOOLS_TIMEOUT_MS,
  521. process.list_tools(
  522. request_id,
  523. Some(McpListToolsParams {
  524. cursor: cursor.clone(),
  525. }),
  526. ),
  527. )
  528. .await?
  529. };
  530. if let Some(error) = response.error {
  531. return Err(McpServerManagerError::JsonRpc {
  532. server_name: server_name.to_string(),
  533. method: "tools/list",
  534. error,
  535. });
  536. }
  537. let result =
  538. response
  539. .result
  540. .ok_or_else(|| McpServerManagerError::InvalidResponse {
  541. server_name: server_name.to_string(),
  542. method: "tools/list",
  543. details: "missing result payload".to_string(),
  544. })?;
  545. for tool in result.tools {
  546. let qualified_name = mcp_tool_name(server_name, &tool.name);
  547. discovered_tools.push(ManagedMcpTool {
  548. server_name: server_name.to_string(),
  549. qualified_name,
  550. raw_name: tool.name.clone(),
  551. tool,
  552. });
  553. }
  554. match result.next_cursor {
  555. Some(next_cursor) => cursor = Some(next_cursor),
  556. None => break,
  557. }
  558. }
  559. Ok(discovered_tools)
  560. }
  561. async fn reset_server(&mut self, server_name: &str) -> Result<(), McpServerManagerError> {
  562. let mut process = {
  563. let server = self.server_mut(server_name)?;
  564. server.initialized = false;
  565. server.process.take()
  566. };
  567. if let Some(process) = process.as_mut() {
  568. let _ = process.shutdown().await;
  569. }
  570. Ok(())
  571. }
  572. fn is_retryable_error(error: &McpServerManagerError) -> bool {
  573. matches!(
  574. error,
  575. McpServerManagerError::Transport { .. } | McpServerManagerError::Timeout { .. }
  576. )
  577. }
  578. fn should_reset_server(error: &McpServerManagerError) -> bool {
  579. matches!(
  580. error,
  581. McpServerManagerError::Transport { .. }
  582. | McpServerManagerError::Timeout { .. }
  583. | McpServerManagerError::InvalidResponse { .. }
  584. )
  585. }
  586. async fn run_process_request<T, F>(
  587. server_name: &str,
  588. method: &'static str,
  589. timeout_ms: u64,
  590. future: F,
  591. ) -> Result<T, McpServerManagerError>
  592. where
  593. F: Future<Output = io::Result<T>>,
  594. {
  595. match timeout(Duration::from_millis(timeout_ms), future).await {
  596. Ok(Ok(value)) => Ok(value),
  597. Ok(Err(error)) if error.kind() == io::ErrorKind::InvalidData => {
  598. Err(McpServerManagerError::InvalidResponse {
  599. server_name: server_name.to_string(),
  600. method,
  601. details: error.to_string(),
  602. })
  603. }
  604. Ok(Err(source)) => Err(McpServerManagerError::Transport {
  605. server_name: server_name.to_string(),
  606. method,
  607. source,
  608. }),
  609. Err(_) => Err(McpServerManagerError::Timeout {
  610. server_name: server_name.to_string(),
  611. method,
  612. timeout_ms,
  613. }),
  614. }
  615. }
  616. async fn ensure_server_ready(
  617. &mut self,
  618. server_name: &str,
  619. ) -> Result<(), McpServerManagerError> {
  620. if self.server_process_exited(server_name)? {
  621. self.reset_server(server_name).await?;
  622. }
  623. let mut attempts = 0;
  624. loop {
  625. let needs_spawn = self
  626. .servers
  627. .get(server_name)
  628. .map(|server| server.process.is_none())
  629. .ok_or_else(|| McpServerManagerError::UnknownServer {
  630. server_name: server_name.to_string(),
  631. })?;
  632. if needs_spawn {
  633. let server = self.server_mut(server_name)?;
  634. server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?);
  635. server.initialized = false;
  636. }
  637. let needs_initialize = self
  638. .servers
  639. .get(server_name)
  640. .map(|server| !server.initialized)
  641. .ok_or_else(|| McpServerManagerError::UnknownServer {
  642. server_name: server_name.to_string(),
  643. })?;
  644. if !needs_initialize {
  645. return Ok(());
  646. }
  647. let request_id = self.take_request_id();
  648. let response = {
  649. let server = self.server_mut(server_name)?;
  650. let process = server.process.as_mut().ok_or_else(|| {
  651. McpServerManagerError::InvalidResponse {
  652. server_name: server_name.to_string(),
  653. method: "initialize",
  654. details: "server process missing before initialize".to_string(),
  655. }
  656. })?;
  657. Self::run_process_request(
  658. server_name,
  659. "initialize",
  660. MCP_INITIALIZE_TIMEOUT_MS,
  661. process.initialize(request_id, default_initialize_params()),
  662. )
  663. .await
  664. };
  665. let response = match response {
  666. Ok(response) => response,
  667. Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
  668. self.reset_server(server_name).await?;
  669. attempts += 1;
  670. continue;
  671. }
  672. Err(error) => {
  673. if Self::should_reset_server(&error) {
  674. self.reset_server(server_name).await?;
  675. }
  676. return Err(error);
  677. }
  678. };
  679. if let Some(error) = response.error {
  680. return Err(McpServerManagerError::JsonRpc {
  681. server_name: server_name.to_string(),
  682. method: "initialize",
  683. error,
  684. });
  685. }
  686. if response.result.is_none() {
  687. let error = McpServerManagerError::InvalidResponse {
  688. server_name: server_name.to_string(),
  689. method: "initialize",
  690. details: "missing result payload".to_string(),
  691. };
  692. self.reset_server(server_name).await?;
  693. return Err(error);
  694. }
  695. let server = self.server_mut(server_name)?;
  696. server.initialized = true;
  697. return Ok(());
  698. }
  699. }
  700. }
  701. #[derive(Debug)]
  702. pub struct McpStdioProcess {
  703. child: Child,
  704. stdin: ChildStdin,
  705. stdout: BufReader<ChildStdout>,
  706. }
  707. impl McpStdioProcess {
  708. pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
  709. let mut command = Command::new(&transport.command);
  710. command
  711. .args(&transport.args)
  712. .stdin(Stdio::piped())
  713. .stdout(Stdio::piped())
  714. .stderr(Stdio::inherit());
  715. apply_env(&mut command, &transport.env);
  716. let mut child = command.spawn()?;
  717. let stdin = child
  718. .stdin
  719. .take()
  720. .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
  721. let stdout = child
  722. .stdout
  723. .take()
  724. .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
  725. Ok(Self {
  726. child,
  727. stdin,
  728. stdout: BufReader::new(stdout),
  729. })
  730. }
  731. pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
  732. self.stdin.write_all(bytes).await
  733. }
  734. pub async fn flush(&mut self) -> io::Result<()> {
  735. self.stdin.flush().await
  736. }
  737. pub async fn write_line(&mut self, line: &str) -> io::Result<()> {
  738. self.write_all(line.as_bytes()).await?;
  739. self.write_all(b"\n").await?;
  740. self.flush().await
  741. }
  742. pub async fn read_line(&mut self) -> io::Result<String> {
  743. let mut line = String::new();
  744. let bytes_read = self.stdout.read_line(&mut line).await?;
  745. if bytes_read == 0 {
  746. return Err(io::Error::new(
  747. io::ErrorKind::UnexpectedEof,
  748. "MCP stdio stream closed while reading line",
  749. ));
  750. }
  751. Ok(line)
  752. }
  753. pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
  754. let mut buffer = vec![0_u8; 4096];
  755. let read = self.stdout.read(&mut buffer).await?;
  756. buffer.truncate(read);
  757. Ok(buffer)
  758. }
  759. pub async fn write_frame(&mut self, payload: &[u8]) -> io::Result<()> {
  760. let encoded = encode_frame(payload);
  761. self.write_all(&encoded).await?;
  762. self.flush().await
  763. }
  764. pub async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
  765. let mut content_length = None;
  766. loop {
  767. let mut line = String::new();
  768. let bytes_read = self.stdout.read_line(&mut line).await?;
  769. if bytes_read == 0 {
  770. return Err(io::Error::new(
  771. io::ErrorKind::UnexpectedEof,
  772. "MCP stdio stream closed while reading headers",
  773. ));
  774. }
  775. if line == "\r\n" {
  776. break;
  777. }
  778. let header = line.trim_end_matches(['\r', '\n']);
  779. if let Some((name, value)) = header.split_once(':') {
  780. if name.trim().eq_ignore_ascii_case("Content-Length") {
  781. let parsed = value
  782. .trim()
  783. .parse::<usize>()
  784. .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
  785. content_length = Some(parsed);
  786. }
  787. }
  788. }
  789. let content_length = content_length.ok_or_else(|| {
  790. io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
  791. })?;
  792. let mut payload = vec![0_u8; content_length];
  793. self.stdout.read_exact(&mut payload).await?;
  794. Ok(payload)
  795. }
  796. pub async fn write_jsonrpc_message<T: Serialize>(&mut self, message: &T) -> io::Result<()> {
  797. let body = serde_json::to_vec(message)
  798. .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
  799. self.write_frame(&body).await
  800. }
  801. pub async fn read_jsonrpc_message<T: DeserializeOwned>(&mut self) -> io::Result<T> {
  802. let payload = self.read_frame().await?;
  803. serde_json::from_slice(&payload)
  804. .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
  805. }
  806. pub async fn send_request<T: Serialize>(
  807. &mut self,
  808. request: &JsonRpcRequest<T>,
  809. ) -> io::Result<()> {
  810. self.write_jsonrpc_message(request).await
  811. }
  812. pub async fn read_response<T: DeserializeOwned>(&mut self) -> io::Result<JsonRpcResponse<T>> {
  813. self.read_jsonrpc_message().await
  814. }
  815. pub async fn request<TParams: Serialize, TResult: DeserializeOwned>(
  816. &mut self,
  817. id: JsonRpcId,
  818. method: impl Into<String>,
  819. params: Option<TParams>,
  820. ) -> io::Result<JsonRpcResponse<TResult>> {
  821. let method = method.into();
  822. let request = JsonRpcRequest::new(id.clone(), method.clone(), params);
  823. self.send_request(&request).await?;
  824. let response = self.read_response().await?;
  825. if response.jsonrpc != "2.0" {
  826. return Err(io::Error::new(
  827. io::ErrorKind::InvalidData,
  828. format!(
  829. "MCP response for {method} used unsupported jsonrpc version `{}`",
  830. response.jsonrpc
  831. ),
  832. ));
  833. }
  834. if response.id != id {
  835. return Err(io::Error::new(
  836. io::ErrorKind::InvalidData,
  837. format!(
  838. "MCP response for {method} used mismatched id: expected {id:?}, got {:?}",
  839. response.id
  840. ),
  841. ));
  842. }
  843. Ok(response)
  844. }
  845. pub async fn initialize(
  846. &mut self,
  847. id: JsonRpcId,
  848. params: McpInitializeParams,
  849. ) -> io::Result<JsonRpcResponse<McpInitializeResult>> {
  850. self.request(id, "initialize", Some(params)).await
  851. }
  852. pub async fn list_tools(
  853. &mut self,
  854. id: JsonRpcId,
  855. params: Option<McpListToolsParams>,
  856. ) -> io::Result<JsonRpcResponse<McpListToolsResult>> {
  857. self.request(id, "tools/list", params).await
  858. }
  859. pub async fn call_tool(
  860. &mut self,
  861. id: JsonRpcId,
  862. params: McpToolCallParams,
  863. ) -> io::Result<JsonRpcResponse<McpToolCallResult>> {
  864. self.request(id, "tools/call", Some(params)).await
  865. }
  866. pub async fn list_resources(
  867. &mut self,
  868. id: JsonRpcId,
  869. params: Option<McpListResourcesParams>,
  870. ) -> io::Result<JsonRpcResponse<McpListResourcesResult>> {
  871. self.request(id, "resources/list", params).await
  872. }
  873. pub async fn read_resource(
  874. &mut self,
  875. id: JsonRpcId,
  876. params: McpReadResourceParams,
  877. ) -> io::Result<JsonRpcResponse<McpReadResourceResult>> {
  878. self.request(id, "resources/read", Some(params)).await
  879. }
  880. pub async fn terminate(&mut self) -> io::Result<()> {
  881. self.child.kill().await
  882. }
  883. pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
  884. self.child.wait().await
  885. }
  886. pub fn has_exited(&mut self) -> io::Result<bool> {
  887. Ok(self.child.try_wait()?.is_some())
  888. }
  889. async fn shutdown(&mut self) -> io::Result<()> {
  890. if self.child.try_wait()?.is_none() {
  891. match self.child.kill().await {
  892. Ok(()) => {}
  893. Err(error) if error.kind() == io::ErrorKind::InvalidInput => {}
  894. Err(error) => return Err(error),
  895. }
  896. }
  897. let _ = self.child.wait().await?;
  898. Ok(())
  899. }
  900. }
  901. pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
  902. match &bootstrap.transport {
  903. McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
  904. other => Err(io::Error::new(
  905. io::ErrorKind::InvalidInput,
  906. format!(
  907. "MCP bootstrap transport for {} is not stdio: {other:?}",
  908. bootstrap.server_name
  909. ),
  910. )),
  911. }
  912. }
  913. fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
  914. for (key, value) in env {
  915. command.env(key, value);
  916. }
  917. }
  918. fn encode_frame(payload: &[u8]) -> Vec<u8> {
  919. let header = format!("Content-Length: {}\r\n\r\n", payload.len());
  920. let mut framed = header.into_bytes();
  921. framed.extend_from_slice(payload);
  922. framed
  923. }
  924. fn default_initialize_params() -> McpInitializeParams {
  925. McpInitializeParams {
  926. protocol_version: "2025-03-26".to_string(),
  927. capabilities: JsonValue::Object(serde_json::Map::new()),
  928. client_info: McpInitializeClientInfo {
  929. name: "runtime".to_string(),
  930. version: env!("CARGO_PKG_VERSION").to_string(),
  931. },
  932. }
  933. }
  934. #[cfg(test)]
  935. mod tests {
  936. use std::collections::BTreeMap;
  937. use std::fs;
  938. use std::io::ErrorKind;
  939. use std::os::unix::fs::PermissionsExt;
  940. use std::path::{Path, PathBuf};
  941. use std::sync::atomic::{AtomicU64, Ordering};
  942. use std::time::{SystemTime, UNIX_EPOCH};
  943. use serde_json::json;
  944. use tokio::runtime::Builder;
  945. use crate::config::{
  946. ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig,
  947. McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig,
  948. };
  949. use crate::mcp::mcp_tool_name;
  950. use crate::mcp_client::McpClientBootstrap;
  951. use super::{
  952. spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
  953. McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
  954. McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
  955. McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
  956. };
  957. fn temp_dir() -> PathBuf {
  958. static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0);
  959. let nanos = SystemTime::now()
  960. .duration_since(UNIX_EPOCH)
  961. .expect("time should be after epoch")
  962. .as_nanos();
  963. let unique_id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed);
  964. std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}-{unique_id}"))
  965. }
  966. fn write_echo_script() -> PathBuf {
  967. let root = temp_dir();
  968. fs::create_dir_all(&root).expect("temp dir");
  969. let script_path = root.join("echo-mcp.sh");
  970. fs::write(
  971. &script_path,
  972. "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
  973. )
  974. .expect("write script");
  975. let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
  976. permissions.set_mode(0o755);
  977. fs::set_permissions(&script_path, permissions).expect("chmod");
  978. script_path
  979. }
  980. fn write_jsonrpc_script() -> PathBuf {
  981. let root = temp_dir();
  982. fs::create_dir_all(&root).expect("temp dir");
  983. let script_path = root.join("jsonrpc-mcp.py");
  984. let script = [
  985. "#!/usr/bin/env python3",
  986. "import json, os, sys",
  987. "LOWERCASE_CONTENT_LENGTH = os.environ.get('MCP_LOWERCASE_CONTENT_LENGTH') == '1'",
  988. "MISMATCHED_RESPONSE_ID = os.environ.get('MCP_MISMATCHED_RESPONSE_ID') == '1'",
  989. "header = b''",
  990. r"while not header.endswith(b'\r\n\r\n'):",
  991. " chunk = sys.stdin.buffer.read(1)",
  992. " if not chunk:",
  993. " raise SystemExit(1)",
  994. " header += chunk",
  995. "length = 0",
  996. r"for line in header.decode().split('\r\n'):",
  997. r" if line.lower().startswith('content-length:'):",
  998. r" length = int(line.split(':', 1)[1].strip())",
  999. "payload = sys.stdin.buffer.read(length)",
  1000. "request = json.loads(payload.decode())",
  1001. r"assert request['jsonrpc'] == '2.0'",
  1002. r"assert request['method'] == 'initialize'",
  1003. "response_id = 'wrong-id' if MISMATCHED_RESPONSE_ID else request['id']",
  1004. "header_name = 'content-length' if LOWERCASE_CONTENT_LENGTH else 'Content-Length'",
  1005. r"response = json.dumps({",
  1006. r" 'jsonrpc': '2.0',",
  1007. r" 'id': response_id,",
  1008. r" 'result': {",
  1009. r" 'protocolVersion': request['params']['protocolVersion'],",
  1010. r" 'capabilities': {'tools': {}},",
  1011. r" 'serverInfo': {'name': 'fake-mcp', 'version': '0.1.0'}",
  1012. r" }",
  1013. r"}).encode()",
  1014. r"sys.stdout.buffer.write(f'{header_name}: {len(response)}\r\n\r\n'.encode() + response)",
  1015. "sys.stdout.buffer.flush()",
  1016. "",
  1017. ]
  1018. .join("\n");
  1019. fs::write(&script_path, script).expect("write script");
  1020. let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
  1021. permissions.set_mode(0o755);
  1022. fs::set_permissions(&script_path, permissions).expect("chmod");
  1023. script_path
  1024. }
  1025. #[allow(clippy::too_many_lines)]
  1026. fn write_mcp_server_script() -> PathBuf {
  1027. let root = temp_dir();
  1028. fs::create_dir_all(&root).expect("temp dir");
  1029. let script_path = root.join("fake-mcp-server.py");
  1030. let script = [
  1031. "#!/usr/bin/env python3",
  1032. "import json, os, sys, time",
  1033. "TOOL_CALL_DELAY_MS = int(os.environ.get('MCP_TOOL_CALL_DELAY_MS', '0'))",
  1034. "INVALID_TOOL_CALL_RESPONSE = os.environ.get('MCP_INVALID_TOOL_CALL_RESPONSE') == '1'",
  1035. "",
  1036. "def read_message():",
  1037. " header = b''",
  1038. r" while not header.endswith(b'\r\n\r\n'):",
  1039. " chunk = sys.stdin.buffer.read(1)",
  1040. " if not chunk:",
  1041. " return None",
  1042. " header += chunk",
  1043. " length = 0",
  1044. r" for line in header.decode().split('\r\n'):",
  1045. r" if line.lower().startswith('content-length:'):",
  1046. r" length = int(line.split(':', 1)[1].strip())",
  1047. " payload = sys.stdin.buffer.read(length)",
  1048. " return json.loads(payload.decode())",
  1049. "",
  1050. "def send_message(message):",
  1051. " payload = json.dumps(message).encode()",
  1052. r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
  1053. " sys.stdout.buffer.flush()",
  1054. "",
  1055. "while True:",
  1056. " request = read_message()",
  1057. " if request is None:",
  1058. " break",
  1059. " method = request['method']",
  1060. " if method == 'initialize':",
  1061. " send_message({",
  1062. " 'jsonrpc': '2.0',",
  1063. " 'id': request['id'],",
  1064. " 'result': {",
  1065. " 'protocolVersion': request['params']['protocolVersion'],",
  1066. " 'capabilities': {'tools': {}, 'resources': {}},",
  1067. " 'serverInfo': {'name': 'fake-mcp', 'version': '0.2.0'}",
  1068. " }",
  1069. " })",
  1070. " elif method == 'tools/list':",
  1071. " send_message({",
  1072. " 'jsonrpc': '2.0',",
  1073. " 'id': request['id'],",
  1074. " 'result': {",
  1075. " 'tools': [",
  1076. " {",
  1077. " 'name': 'echo',",
  1078. " 'description': 'Echoes text',",
  1079. " 'inputSchema': {",
  1080. " 'type': 'object',",
  1081. " 'properties': {'text': {'type': 'string'}},",
  1082. " 'required': ['text']",
  1083. " }",
  1084. " }",
  1085. " ]",
  1086. " }",
  1087. " })",
  1088. " elif method == 'tools/call':",
  1089. " if INVALID_TOOL_CALL_RESPONSE:",
  1090. " sys.stdout.buffer.write(b'Content-Length: 5\\r\\n\\r\\nnope!')",
  1091. " sys.stdout.buffer.flush()",
  1092. " continue",
  1093. " if TOOL_CALL_DELAY_MS:",
  1094. " time.sleep(TOOL_CALL_DELAY_MS / 1000)",
  1095. " args = request['params'].get('arguments') or {}",
  1096. " if request['params']['name'] == 'fail':",
  1097. " send_message({",
  1098. " 'jsonrpc': '2.0',",
  1099. " 'id': request['id'],",
  1100. " 'error': {'code': -32001, 'message': 'tool failed'},",
  1101. " })",
  1102. " else:",
  1103. " text = args.get('text', '')",
  1104. " send_message({",
  1105. " 'jsonrpc': '2.0',",
  1106. " 'id': request['id'],",
  1107. " 'result': {",
  1108. " 'content': [{'type': 'text', 'text': f'echo:{text}'}],",
  1109. " 'structuredContent': {'echoed': text},",
  1110. " 'isError': False",
  1111. " }",
  1112. " })",
  1113. " elif method == 'resources/list':",
  1114. " send_message({",
  1115. " 'jsonrpc': '2.0',",
  1116. " 'id': request['id'],",
  1117. " 'result': {",
  1118. " 'resources': [",
  1119. " {",
  1120. " 'uri': 'file://guide.txt',",
  1121. " 'name': 'guide',",
  1122. " 'description': 'Guide text',",
  1123. " 'mimeType': 'text/plain'",
  1124. " }",
  1125. " ]",
  1126. " }",
  1127. " })",
  1128. " elif method == 'resources/read':",
  1129. " uri = request['params']['uri']",
  1130. " send_message({",
  1131. " 'jsonrpc': '2.0',",
  1132. " 'id': request['id'],",
  1133. " 'result': {",
  1134. " 'contents': [",
  1135. " {",
  1136. " 'uri': uri,",
  1137. " 'mimeType': 'text/plain',",
  1138. " 'text': f'contents for {uri}'",
  1139. " }",
  1140. " ]",
  1141. " }",
  1142. " })",
  1143. " else:",
  1144. " send_message({",
  1145. " 'jsonrpc': '2.0',",
  1146. " 'id': request['id'],",
  1147. " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
  1148. " })",
  1149. "",
  1150. ]
  1151. .join("\n");
  1152. fs::write(&script_path, script).expect("write script");
  1153. let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
  1154. permissions.set_mode(0o755);
  1155. fs::set_permissions(&script_path, permissions).expect("chmod");
  1156. script_path
  1157. }
  1158. #[allow(clippy::too_many_lines)]
  1159. fn write_manager_mcp_server_script() -> PathBuf {
  1160. let root = temp_dir();
  1161. fs::create_dir_all(&root).expect("temp dir");
  1162. let script_path = root.join("manager-mcp-server.py");
  1163. let script = [
  1164. "#!/usr/bin/env python3",
  1165. "import json, os, sys, time",
  1166. "",
  1167. "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')",
  1168. "LOG_PATH = os.environ.get('MCP_LOG_PATH')",
  1169. "EXIT_AFTER_TOOLS_LIST = os.environ.get('MCP_EXIT_AFTER_TOOLS_LIST') == '1'",
  1170. "FAIL_ONCE_MODE = os.environ.get('MCP_FAIL_ONCE_MODE')",
  1171. "FAIL_ONCE_MARKER = os.environ.get('MCP_FAIL_ONCE_MARKER')",
  1172. "initialize_count = 0",
  1173. "",
  1174. "def log(method):",
  1175. " if LOG_PATH:",
  1176. " with open(LOG_PATH, 'a', encoding='utf-8') as handle:",
  1177. " handle.write(f'{method}\\n')",
  1178. "",
  1179. "def should_fail_once():",
  1180. " if not FAIL_ONCE_MODE or not FAIL_ONCE_MARKER:",
  1181. " return False",
  1182. " if os.path.exists(FAIL_ONCE_MARKER):",
  1183. " return False",
  1184. " with open(FAIL_ONCE_MARKER, 'w', encoding='utf-8') as handle:",
  1185. " handle.write(FAIL_ONCE_MODE)",
  1186. " return True",
  1187. "",
  1188. "def read_message():",
  1189. " header = b''",
  1190. r" while not header.endswith(b'\r\n\r\n'):",
  1191. " chunk = sys.stdin.buffer.read(1)",
  1192. " if not chunk:",
  1193. " return None",
  1194. " header += chunk",
  1195. " length = 0",
  1196. r" for line in header.decode().split('\r\n'):",
  1197. r" if line.lower().startswith('content-length:'):",
  1198. r" length = int(line.split(':', 1)[1].strip())",
  1199. " payload = sys.stdin.buffer.read(length)",
  1200. " return json.loads(payload.decode())",
  1201. "",
  1202. "def send_message(message):",
  1203. " payload = json.dumps(message).encode()",
  1204. r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
  1205. " sys.stdout.buffer.flush()",
  1206. "",
  1207. "while True:",
  1208. " request = read_message()",
  1209. " if request is None:",
  1210. " break",
  1211. " method = request['method']",
  1212. " log(method)",
  1213. " if method == 'initialize':",
  1214. " if FAIL_ONCE_MODE == 'initialize_hang' and should_fail_once():",
  1215. " log('initialize-hang')",
  1216. " while True:",
  1217. " time.sleep(1)",
  1218. " initialize_count += 1",
  1219. " send_message({",
  1220. " 'jsonrpc': '2.0',",
  1221. " 'id': request['id'],",
  1222. " 'result': {",
  1223. " 'protocolVersion': request['params']['protocolVersion'],",
  1224. " 'capabilities': {'tools': {}},",
  1225. " 'serverInfo': {'name': LABEL, 'version': '1.0.0'}",
  1226. " }",
  1227. " })",
  1228. " elif method == 'tools/list':",
  1229. " send_message({",
  1230. " 'jsonrpc': '2.0',",
  1231. " 'id': request['id'],",
  1232. " 'result': {",
  1233. " 'tools': [",
  1234. " {",
  1235. " 'name': 'echo',",
  1236. " 'description': f'Echo tool for {LABEL}',",
  1237. " 'inputSchema': {",
  1238. " 'type': 'object',",
  1239. " 'properties': {'text': {'type': 'string'}},",
  1240. " 'required': ['text']",
  1241. " }",
  1242. " }",
  1243. " ]",
  1244. " }",
  1245. " })",
  1246. " if EXIT_AFTER_TOOLS_LIST:",
  1247. " raise SystemExit(0)",
  1248. " elif method == 'tools/call':",
  1249. " if FAIL_ONCE_MODE == 'tool_call_disconnect' and should_fail_once():",
  1250. " log('tools/call-disconnect')",
  1251. " raise SystemExit(0)",
  1252. " args = request['params'].get('arguments') or {}",
  1253. " text = args.get('text', '')",
  1254. " send_message({",
  1255. " 'jsonrpc': '2.0',",
  1256. " 'id': request['id'],",
  1257. " 'result': {",
  1258. " 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],",
  1259. " 'structuredContent': {",
  1260. " 'server': LABEL,",
  1261. " 'echoed': text,",
  1262. " 'initializeCount': initialize_count",
  1263. " },",
  1264. " 'isError': False",
  1265. " }",
  1266. " })",
  1267. " else:",
  1268. " send_message({",
  1269. " 'jsonrpc': '2.0',",
  1270. " 'id': request['id'],",
  1271. " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
  1272. " })",
  1273. "",
  1274. ]
  1275. .join("\n");
  1276. fs::write(&script_path, script).expect("write script");
  1277. let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
  1278. permissions.set_mode(0o755);
  1279. fs::set_permissions(&script_path, permissions).expect("chmod");
  1280. script_path
  1281. }
  1282. fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
  1283. let config = ScopedMcpServerConfig {
  1284. scope: ConfigSource::Local,
  1285. config: McpServerConfig::Stdio(McpStdioServerConfig {
  1286. command: "/bin/sh".to_string(),
  1287. args: vec![script_path.to_string_lossy().into_owned()],
  1288. env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
  1289. tool_call_timeout_ms: None,
  1290. }),
  1291. };
  1292. McpClientBootstrap::from_scoped_config("stdio server", &config)
  1293. }
  1294. fn script_transport(script_path: &Path) -> crate::mcp_client::McpStdioTransport {
  1295. script_transport_with_env(script_path, BTreeMap::new())
  1296. }
  1297. fn script_transport_with_env(
  1298. script_path: &Path,
  1299. env: BTreeMap<String, String>,
  1300. ) -> crate::mcp_client::McpStdioTransport {
  1301. crate::mcp_client::McpStdioTransport {
  1302. command: "python3".to_string(),
  1303. args: vec![script_path.to_string_lossy().into_owned()],
  1304. env,
  1305. tool_call_timeout_ms: None,
  1306. }
  1307. }
  1308. fn cleanup_script(script_path: &Path) {
  1309. if let Err(error) = fs::remove_file(script_path) {
  1310. assert_eq!(
  1311. error.kind(),
  1312. std::io::ErrorKind::NotFound,
  1313. "cleanup script: {error}"
  1314. );
  1315. }
  1316. if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) {
  1317. assert_eq!(
  1318. error.kind(),
  1319. std::io::ErrorKind::NotFound,
  1320. "cleanup dir: {error}"
  1321. );
  1322. }
  1323. }
  1324. fn manager_server_config(
  1325. script_path: &Path,
  1326. label: &str,
  1327. log_path: &Path,
  1328. ) -> ScopedMcpServerConfig {
  1329. manager_server_config_with_env(script_path, label, log_path, BTreeMap::new())
  1330. }
  1331. fn manager_server_config_with_env(
  1332. script_path: &Path,
  1333. label: &str,
  1334. log_path: &Path,
  1335. extra_env: BTreeMap<String, String>,
  1336. ) -> ScopedMcpServerConfig {
  1337. let mut env = BTreeMap::from([
  1338. ("MCP_SERVER_LABEL".to_string(), label.to_string()),
  1339. (
  1340. "MCP_LOG_PATH".to_string(),
  1341. log_path.to_string_lossy().into_owned(),
  1342. ),
  1343. ]);
  1344. env.extend(extra_env);
  1345. ScopedMcpServerConfig {
  1346. scope: ConfigSource::Local,
  1347. config: McpServerConfig::Stdio(McpStdioServerConfig {
  1348. command: "python3".to_string(),
  1349. args: vec![script_path.to_string_lossy().into_owned()],
  1350. env,
  1351. tool_call_timeout_ms: None,
  1352. }),
  1353. }
  1354. }
  1355. #[test]
  1356. fn spawns_stdio_process_and_round_trips_io() {
  1357. let runtime = Builder::new_current_thread()
  1358. .enable_all()
  1359. .build()
  1360. .expect("runtime");
  1361. runtime.block_on(async {
  1362. let script_path = write_echo_script();
  1363. let bootstrap = sample_bootstrap(&script_path);
  1364. let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
  1365. let ready = process.read_line().await.expect("read ready");
  1366. assert_eq!(ready, "READY:secret-value\n");
  1367. process
  1368. .write_line("ping from client")
  1369. .await
  1370. .expect("write line");
  1371. let echoed = process.read_line().await.expect("read echo");
  1372. assert_eq!(echoed, "ECHO:ping from client\n");
  1373. let status = process.wait().await.expect("wait for exit");
  1374. assert!(status.success());
  1375. cleanup_script(&script_path);
  1376. });
  1377. }
  1378. #[test]
  1379. fn rejects_non_stdio_bootstrap() {
  1380. let config = ScopedMcpServerConfig {
  1381. scope: ConfigSource::Local,
  1382. config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
  1383. name: "sdk-server".to_string(),
  1384. }),
  1385. };
  1386. let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
  1387. let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
  1388. assert_eq!(error.kind(), ErrorKind::InvalidInput);
  1389. }
  1390. #[test]
  1391. fn round_trips_initialize_request_and_response_over_stdio_frames() {
  1392. let runtime = Builder::new_current_thread()
  1393. .enable_all()
  1394. .build()
  1395. .expect("runtime");
  1396. runtime.block_on(async {
  1397. let script_path = write_jsonrpc_script();
  1398. let transport = script_transport(&script_path);
  1399. let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
  1400. let response = process
  1401. .initialize(
  1402. JsonRpcId::Number(1),
  1403. McpInitializeParams {
  1404. protocol_version: "2025-03-26".to_string(),
  1405. capabilities: json!({"roots": {}}),
  1406. client_info: McpInitializeClientInfo {
  1407. name: "runtime-tests".to_string(),
  1408. version: "0.1.0".to_string(),
  1409. },
  1410. },
  1411. )
  1412. .await
  1413. .expect("initialize roundtrip");
  1414. assert_eq!(response.id, JsonRpcId::Number(1));
  1415. assert_eq!(response.error, None);
  1416. assert_eq!(
  1417. response.result,
  1418. Some(McpInitializeResult {
  1419. protocol_version: "2025-03-26".to_string(),
  1420. capabilities: json!({"tools": {}}),
  1421. server_info: McpInitializeServerInfo {
  1422. name: "fake-mcp".to_string(),
  1423. version: "0.1.0".to_string(),
  1424. },
  1425. })
  1426. );
  1427. let status = process.wait().await.expect("wait for exit");
  1428. assert!(status.success());
  1429. cleanup_script(&script_path);
  1430. });
  1431. }
  1432. #[test]
  1433. fn write_jsonrpc_request_emits_content_length_frame() {
  1434. let runtime = Builder::new_current_thread()
  1435. .enable_all()
  1436. .build()
  1437. .expect("runtime");
  1438. runtime.block_on(async {
  1439. let script_path = write_jsonrpc_script();
  1440. let transport = script_transport(&script_path);
  1441. let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
  1442. let request = JsonRpcRequest::new(
  1443. JsonRpcId::Number(7),
  1444. "initialize",
  1445. Some(json!({
  1446. "protocolVersion": "2025-03-26",
  1447. "capabilities": {},
  1448. "clientInfo": {"name": "runtime-tests", "version": "0.1.0"}
  1449. })),
  1450. );
  1451. process.send_request(&request).await.expect("send request");
  1452. let response: JsonRpcResponse<serde_json::Value> =
  1453. process.read_response().await.expect("read response");
  1454. assert_eq!(response.id, JsonRpcId::Number(7));
  1455. assert_eq!(response.jsonrpc, "2.0");
  1456. let status = process.wait().await.expect("wait for exit");
  1457. assert!(status.success());
  1458. cleanup_script(&script_path);
  1459. });
  1460. }
  1461. #[test]
  1462. fn given_lowercase_content_length_when_initialize_then_response_parses() {
  1463. let runtime = Builder::new_current_thread()
  1464. .enable_all()
  1465. .build()
  1466. .expect("runtime");
  1467. runtime.block_on(async {
  1468. let script_path = write_jsonrpc_script();
  1469. let transport = script_transport_with_env(
  1470. &script_path,
  1471. BTreeMap::from([(
  1472. "MCP_LOWERCASE_CONTENT_LENGTH".to_string(),
  1473. "1".to_string(),
  1474. )]),
  1475. );
  1476. let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
  1477. let response = process
  1478. .initialize(
  1479. JsonRpcId::Number(8),
  1480. McpInitializeParams {
  1481. protocol_version: "2025-03-26".to_string(),
  1482. capabilities: json!({"roots": {}}),
  1483. client_info: McpInitializeClientInfo {
  1484. name: "runtime-tests".to_string(),
  1485. version: "0.1.0".to_string(),
  1486. },
  1487. },
  1488. )
  1489. .await
  1490. .expect("initialize roundtrip");
  1491. assert_eq!(response.id, JsonRpcId::Number(8));
  1492. assert_eq!(response.error, None);
  1493. assert!(response.result.is_some());
  1494. let status = process.wait().await.expect("wait for exit");
  1495. assert!(status.success());
  1496. cleanup_script(&script_path);
  1497. });
  1498. }
  1499. #[test]
  1500. fn given_mismatched_response_id_when_initialize_then_invalid_data_is_returned() {
  1501. let runtime = Builder::new_current_thread()
  1502. .enable_all()
  1503. .build()
  1504. .expect("runtime");
  1505. runtime.block_on(async {
  1506. let script_path = write_jsonrpc_script();
  1507. let transport = script_transport_with_env(
  1508. &script_path,
  1509. BTreeMap::from([(
  1510. "MCP_MISMATCHED_RESPONSE_ID".to_string(),
  1511. "1".to_string(),
  1512. )]),
  1513. );
  1514. let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
  1515. let error = process
  1516. .initialize(
  1517. JsonRpcId::Number(9),
  1518. McpInitializeParams {
  1519. protocol_version: "2025-03-26".to_string(),
  1520. capabilities: json!({"roots": {}}),
  1521. client_info: McpInitializeClientInfo {
  1522. name: "runtime-tests".to_string(),
  1523. version: "0.1.0".to_string(),
  1524. },
  1525. },
  1526. )
  1527. .await
  1528. .expect_err("mismatched response id should fail");
  1529. assert_eq!(error.kind(), ErrorKind::InvalidData);
  1530. assert!(error.to_string().contains("mismatched id"));
  1531. let status = process.wait().await.expect("wait for exit");
  1532. assert!(status.success());
  1533. cleanup_script(&script_path);
  1534. });
  1535. }
  1536. #[test]
  1537. fn direct_spawn_uses_transport_env() {
  1538. let runtime = Builder::new_current_thread()
  1539. .enable_all()
  1540. .build()
  1541. .expect("runtime");
  1542. runtime.block_on(async {
  1543. let script_path = write_echo_script();
  1544. let transport = crate::mcp_client::McpStdioTransport {
  1545. command: "/bin/sh".to_string(),
  1546. args: vec![script_path.to_string_lossy().into_owned()],
  1547. env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
  1548. tool_call_timeout_ms: None,
  1549. };
  1550. let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
  1551. let ready = process.read_available().await.expect("read ready");
  1552. assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
  1553. process.terminate().await.expect("terminate child");
  1554. let _ = process.wait().await.expect("wait after kill");
  1555. cleanup_script(&script_path);
  1556. });
  1557. }
  1558. #[test]
  1559. fn lists_tools_calls_tool_and_reads_resources_over_jsonrpc() {
  1560. let runtime = Builder::new_current_thread()
  1561. .enable_all()
  1562. .build()
  1563. .expect("runtime");
  1564. runtime.block_on(async {
  1565. let script_path = write_mcp_server_script();
  1566. let transport = script_transport(&script_path);
  1567. let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
  1568. let tools = process
  1569. .list_tools(JsonRpcId::Number(2), None)
  1570. .await
  1571. .expect("list tools");
  1572. assert_eq!(tools.error, None);
  1573. assert_eq!(tools.id, JsonRpcId::Number(2));
  1574. assert_eq!(
  1575. tools.result,
  1576. Some(McpListToolsResult {
  1577. tools: vec![McpTool {
  1578. name: "echo".to_string(),
  1579. description: Some("Echoes text".to_string()),
  1580. input_schema: Some(json!({
  1581. "type": "object",
  1582. "properties": {"text": {"type": "string"}},
  1583. "required": ["text"]
  1584. })),
  1585. annotations: None,
  1586. meta: None,
  1587. }],
  1588. next_cursor: None,
  1589. })
  1590. );
  1591. let call = process
  1592. .call_tool(
  1593. JsonRpcId::String("call-1".to_string()),
  1594. McpToolCallParams {
  1595. name: "echo".to_string(),
  1596. arguments: Some(json!({"text": "hello"})),
  1597. meta: None,
  1598. },
  1599. )
  1600. .await
  1601. .expect("call tool");
  1602. assert_eq!(call.error, None);
  1603. let call_result = call.result.expect("tool result");
  1604. assert_eq!(call_result.is_error, Some(false));
  1605. assert_eq!(
  1606. call_result.structured_content,
  1607. Some(json!({"echoed": "hello"}))
  1608. );
  1609. assert_eq!(call_result.content.len(), 1);
  1610. assert_eq!(call_result.content[0].kind, "text");
  1611. assert_eq!(
  1612. call_result.content[0].data.get("text"),
  1613. Some(&json!("echo:hello"))
  1614. );
  1615. let resources = process
  1616. .list_resources(JsonRpcId::Number(3), None)
  1617. .await
  1618. .expect("list resources");
  1619. let resources_result = resources.result.expect("resources result");
  1620. assert_eq!(resources_result.resources.len(), 1);
  1621. assert_eq!(resources_result.resources[0].uri, "file://guide.txt");
  1622. assert_eq!(
  1623. resources_result.resources[0].mime_type.as_deref(),
  1624. Some("text/plain")
  1625. );
  1626. let read = process
  1627. .read_resource(
  1628. JsonRpcId::Number(4),
  1629. McpReadResourceParams {
  1630. uri: "file://guide.txt".to_string(),
  1631. },
  1632. )
  1633. .await
  1634. .expect("read resource");
  1635. assert_eq!(
  1636. read.result,
  1637. Some(McpReadResourceResult {
  1638. contents: vec![super::McpResourceContents {
  1639. uri: "file://guide.txt".to_string(),
  1640. mime_type: Some("text/plain".to_string()),
  1641. text: Some("contents for file://guide.txt".to_string()),
  1642. blob: None,
  1643. meta: None,
  1644. }],
  1645. })
  1646. );
  1647. process.terminate().await.expect("terminate child");
  1648. let _ = process.wait().await.expect("wait after kill");
  1649. cleanup_script(&script_path);
  1650. });
  1651. }
  1652. #[test]
  1653. fn surfaces_jsonrpc_errors_from_tool_calls() {
  1654. let runtime = Builder::new_current_thread()
  1655. .enable_all()
  1656. .build()
  1657. .expect("runtime");
  1658. runtime.block_on(async {
  1659. let script_path = write_mcp_server_script();
  1660. let transport = script_transport(&script_path);
  1661. let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
  1662. let response = process
  1663. .call_tool(
  1664. JsonRpcId::Number(9),
  1665. McpToolCallParams {
  1666. name: "fail".to_string(),
  1667. arguments: None,
  1668. meta: None,
  1669. },
  1670. )
  1671. .await
  1672. .expect("call tool with error response");
  1673. assert_eq!(response.id, JsonRpcId::Number(9));
  1674. assert!(response.result.is_none());
  1675. assert_eq!(response.error.as_ref().map(|e| e.code), Some(-32001));
  1676. assert_eq!(
  1677. response.error.as_ref().map(|e| e.message.as_str()),
  1678. Some("tool failed")
  1679. );
  1680. process.terminate().await.expect("terminate child");
  1681. let _ = process.wait().await.expect("wait after kill");
  1682. cleanup_script(&script_path);
  1683. });
  1684. }
  1685. #[test]
  1686. fn manager_discovers_tools_from_stdio_config() {
  1687. let runtime = Builder::new_current_thread()
  1688. .enable_all()
  1689. .build()
  1690. .expect("runtime");
  1691. runtime.block_on(async {
  1692. let script_path = write_manager_mcp_server_script();
  1693. let root = script_path.parent().expect("script parent");
  1694. let log_path = root.join("alpha.log");
  1695. let servers = BTreeMap::from([(
  1696. "alpha".to_string(),
  1697. manager_server_config(&script_path, "alpha", &log_path),
  1698. )]);
  1699. let mut manager = McpServerManager::from_servers(&servers);
  1700. let tools = manager.discover_tools().await.expect("discover tools");
  1701. assert_eq!(tools.len(), 1);
  1702. assert_eq!(tools[0].server_name, "alpha");
  1703. assert_eq!(tools[0].raw_name, "echo");
  1704. assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
  1705. assert_eq!(tools[0].tool.name, "echo");
  1706. assert!(manager.unsupported_servers().is_empty());
  1707. manager.shutdown().await.expect("shutdown");
  1708. cleanup_script(&script_path);
  1709. });
  1710. }
  1711. #[test]
  1712. fn manager_routes_tool_calls_to_correct_server() {
  1713. let runtime = Builder::new_current_thread()
  1714. .enable_all()
  1715. .build()
  1716. .expect("runtime");
  1717. runtime.block_on(async {
  1718. let script_path = write_manager_mcp_server_script();
  1719. let root = script_path.parent().expect("script parent");
  1720. let alpha_log = root.join("alpha.log");
  1721. let beta_log = root.join("beta.log");
  1722. let servers = BTreeMap::from([
  1723. (
  1724. "alpha".to_string(),
  1725. manager_server_config(&script_path, "alpha", &alpha_log),
  1726. ),
  1727. (
  1728. "beta".to_string(),
  1729. manager_server_config(&script_path, "beta", &beta_log),
  1730. ),
  1731. ]);
  1732. let mut manager = McpServerManager::from_servers(&servers);
  1733. let tools = manager.discover_tools().await.expect("discover tools");
  1734. assert_eq!(tools.len(), 2);
  1735. let alpha = manager
  1736. .call_tool(
  1737. &mcp_tool_name("alpha", "echo"),
  1738. Some(json!({"text": "hello"})),
  1739. )
  1740. .await
  1741. .expect("call alpha tool");
  1742. let beta = manager
  1743. .call_tool(
  1744. &mcp_tool_name("beta", "echo"),
  1745. Some(json!({"text": "world"})),
  1746. )
  1747. .await
  1748. .expect("call beta tool");
  1749. assert_eq!(
  1750. alpha
  1751. .result
  1752. .as_ref()
  1753. .and_then(|result| result.structured_content.as_ref())
  1754. .and_then(|value| value.get("server")),
  1755. Some(&json!("alpha"))
  1756. );
  1757. assert_eq!(
  1758. beta.result
  1759. .as_ref()
  1760. .and_then(|result| result.structured_content.as_ref())
  1761. .and_then(|value| value.get("server")),
  1762. Some(&json!("beta"))
  1763. );
  1764. manager.shutdown().await.expect("shutdown");
  1765. cleanup_script(&script_path);
  1766. });
  1767. }
  1768. #[test]
  1769. fn manager_times_out_slow_tool_calls() {
  1770. let runtime = Builder::new_current_thread()
  1771. .enable_all()
  1772. .build()
  1773. .expect("runtime");
  1774. runtime.block_on(async {
  1775. let script_path = write_mcp_server_script();
  1776. let root = script_path.parent().expect("script parent");
  1777. let log_path = root.join("timeout.log");
  1778. let servers = BTreeMap::from([(
  1779. "slow".to_string(),
  1780. ScopedMcpServerConfig {
  1781. scope: ConfigSource::Local,
  1782. config: McpServerConfig::Stdio(McpStdioServerConfig {
  1783. command: "python3".to_string(),
  1784. args: vec![script_path.to_string_lossy().into_owned()],
  1785. env: BTreeMap::from([(
  1786. "MCP_TOOL_CALL_DELAY_MS".to_string(),
  1787. "200".to_string(),
  1788. )]),
  1789. tool_call_timeout_ms: Some(25),
  1790. }),
  1791. },
  1792. )]);
  1793. let mut manager = McpServerManager::from_servers(&servers);
  1794. manager.discover_tools().await.expect("discover tools");
  1795. let error = manager
  1796. .call_tool(&mcp_tool_name("slow", "echo"), Some(json!({"text": "slow"})))
  1797. .await
  1798. .expect_err("slow tool call should time out");
  1799. match error {
  1800. McpServerManagerError::Timeout {
  1801. server_name,
  1802. method,
  1803. timeout_ms,
  1804. } => {
  1805. assert_eq!(server_name, "slow");
  1806. assert_eq!(method, "tools/call");
  1807. assert_eq!(timeout_ms, 25);
  1808. }
  1809. other => panic!("expected timeout error, got {other:?}"),
  1810. }
  1811. manager.shutdown().await.expect("shutdown");
  1812. cleanup_script(&script_path);
  1813. let _ = fs::remove_file(log_path);
  1814. });
  1815. }
  1816. #[test]
  1817. fn manager_surfaces_parse_errors_from_tool_calls() {
  1818. let runtime = Builder::new_current_thread()
  1819. .enable_all()
  1820. .build()
  1821. .expect("runtime");
  1822. runtime.block_on(async {
  1823. let script_path = write_mcp_server_script();
  1824. let servers = BTreeMap::from([(
  1825. "broken".to_string(),
  1826. ScopedMcpServerConfig {
  1827. scope: ConfigSource::Local,
  1828. config: McpServerConfig::Stdio(McpStdioServerConfig {
  1829. command: "python3".to_string(),
  1830. args: vec![script_path.to_string_lossy().into_owned()],
  1831. env: BTreeMap::from([(
  1832. "MCP_INVALID_TOOL_CALL_RESPONSE".to_string(),
  1833. "1".to_string(),
  1834. )]),
  1835. tool_call_timeout_ms: Some(1_000),
  1836. }),
  1837. },
  1838. )]);
  1839. let mut manager = McpServerManager::from_servers(&servers);
  1840. manager.discover_tools().await.expect("discover tools");
  1841. let error = manager
  1842. .call_tool(
  1843. &mcp_tool_name("broken", "echo"),
  1844. Some(json!({"text": "invalid-json"})),
  1845. )
  1846. .await
  1847. .expect_err("invalid json should fail");
  1848. match error {
  1849. McpServerManagerError::InvalidResponse {
  1850. server_name,
  1851. method,
  1852. details,
  1853. } => {
  1854. assert_eq!(server_name, "broken");
  1855. assert_eq!(method, "tools/call");
  1856. assert!(details.contains("expected ident") || details.contains("expected value"));
  1857. }
  1858. other => panic!("expected invalid response error, got {other:?}"),
  1859. }
  1860. manager.shutdown().await.expect("shutdown");
  1861. cleanup_script(&script_path);
  1862. });
  1863. }
  1864. #[test]
  1865. fn given_child_exits_after_discovery_when_calling_twice_then_second_call_succeeds_after_reset() {
  1866. let runtime = Builder::new_current_thread()
  1867. .enable_all()
  1868. .build()
  1869. .expect("runtime");
  1870. runtime.block_on(async {
  1871. let script_path = write_manager_mcp_server_script();
  1872. let root = script_path.parent().expect("script parent");
  1873. let log_path = root.join("dropping.log");
  1874. let servers = BTreeMap::from([(
  1875. "alpha".to_string(),
  1876. manager_server_config_with_env(
  1877. &script_path,
  1878. "alpha",
  1879. &log_path,
  1880. BTreeMap::from([(
  1881. "MCP_EXIT_AFTER_TOOLS_LIST".to_string(),
  1882. "1".to_string(),
  1883. )]),
  1884. ),
  1885. )]);
  1886. let mut manager = McpServerManager::from_servers(&servers);
  1887. manager.discover_tools().await.expect("discover tools");
  1888. let first_error = manager
  1889. .call_tool(
  1890. &mcp_tool_name("alpha", "echo"),
  1891. Some(json!({"text": "reconnect"})),
  1892. )
  1893. .await
  1894. .expect_err("first call should fail after transport drops");
  1895. match first_error {
  1896. McpServerManagerError::Transport {
  1897. server_name,
  1898. method,
  1899. source,
  1900. } => {
  1901. assert_eq!(server_name, "alpha");
  1902. assert_eq!(method, "tools/call");
  1903. assert_eq!(source.kind(), ErrorKind::UnexpectedEof);
  1904. }
  1905. other => panic!("expected transport error, got {other:?}"),
  1906. }
  1907. let response = manager
  1908. .call_tool(
  1909. &mcp_tool_name("alpha", "echo"),
  1910. Some(json!({"text": "reconnect"})),
  1911. )
  1912. .await
  1913. .expect("second tool call should succeed after reset");
  1914. assert_eq!(
  1915. response
  1916. .result
  1917. .as_ref()
  1918. .and_then(|result| result.structured_content.as_ref())
  1919. .and_then(|value| value.get("server")),
  1920. Some(&json!("alpha"))
  1921. );
  1922. let log = fs::read_to_string(&log_path).expect("read log");
  1923. assert_eq!(
  1924. log.lines().collect::<Vec<_>>(),
  1925. vec!["initialize", "tools/list", "initialize", "tools/call"]
  1926. );
  1927. manager.shutdown().await.expect("shutdown");
  1928. cleanup_script(&script_path);
  1929. });
  1930. }
  1931. #[test]
  1932. fn given_initialize_hangs_once_when_discover_tools_then_manager_retries_and_succeeds() {
  1933. let runtime = Builder::new_current_thread()
  1934. .enable_all()
  1935. .build()
  1936. .expect("runtime");
  1937. runtime.block_on(async {
  1938. let script_path = write_manager_mcp_server_script();
  1939. let root = script_path.parent().expect("script parent");
  1940. let log_path = root.join("initialize-hang.log");
  1941. let marker_path = root.join("initialize-hang.marker");
  1942. let servers = BTreeMap::from([(
  1943. "alpha".to_string(),
  1944. manager_server_config_with_env(
  1945. &script_path,
  1946. "alpha",
  1947. &log_path,
  1948. BTreeMap::from([
  1949. (
  1950. "MCP_FAIL_ONCE_MODE".to_string(),
  1951. "initialize_hang".to_string(),
  1952. ),
  1953. (
  1954. "MCP_FAIL_ONCE_MARKER".to_string(),
  1955. marker_path.to_string_lossy().into_owned(),
  1956. ),
  1957. ]),
  1958. ),
  1959. )]);
  1960. let mut manager = McpServerManager::from_servers(&servers);
  1961. let tools = manager.discover_tools().await.expect("discover tools after retry");
  1962. assert_eq!(tools.len(), 1);
  1963. assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
  1964. let log = fs::read_to_string(&log_path).expect("read log");
  1965. assert_eq!(
  1966. log.lines().collect::<Vec<_>>(),
  1967. vec!["initialize", "initialize-hang", "initialize", "tools/list"]
  1968. );
  1969. manager.shutdown().await.expect("shutdown");
  1970. cleanup_script(&script_path);
  1971. });
  1972. }
  1973. #[test]
  1974. fn given_tool_call_disconnects_once_when_calling_twice_then_manager_resets_and_next_call_succeeds() {
  1975. let runtime = Builder::new_current_thread()
  1976. .enable_all()
  1977. .build()
  1978. .expect("runtime");
  1979. runtime.block_on(async {
  1980. let script_path = write_manager_mcp_server_script();
  1981. let root = script_path.parent().expect("script parent");
  1982. let log_path = root.join("tool-call-disconnect.log");
  1983. let marker_path = root.join("tool-call-disconnect.marker");
  1984. let servers = BTreeMap::from([(
  1985. "alpha".to_string(),
  1986. manager_server_config_with_env(
  1987. &script_path,
  1988. "alpha",
  1989. &log_path,
  1990. BTreeMap::from([
  1991. (
  1992. "MCP_FAIL_ONCE_MODE".to_string(),
  1993. "tool_call_disconnect".to_string(),
  1994. ),
  1995. (
  1996. "MCP_FAIL_ONCE_MARKER".to_string(),
  1997. marker_path.to_string_lossy().into_owned(),
  1998. ),
  1999. ]),
  2000. ),
  2001. )]);
  2002. let mut manager = McpServerManager::from_servers(&servers);
  2003. manager.discover_tools().await.expect("discover tools");
  2004. let first_error = manager
  2005. .call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "first"})))
  2006. .await
  2007. .expect_err("first tool call should fail when transport drops");
  2008. match first_error {
  2009. McpServerManagerError::Transport {
  2010. server_name,
  2011. method,
  2012. source,
  2013. } => {
  2014. assert_eq!(server_name, "alpha");
  2015. assert_eq!(method, "tools/call");
  2016. assert_eq!(source.kind(), ErrorKind::UnexpectedEof);
  2017. }
  2018. other => panic!("expected transport error, got {other:?}"),
  2019. }
  2020. let response = manager
  2021. .call_tool(&mcp_tool_name("alpha", "echo"), Some(json!({"text": "second"})))
  2022. .await
  2023. .expect("second tool call should succeed after reset");
  2024. assert_eq!(
  2025. response
  2026. .result
  2027. .as_ref()
  2028. .and_then(|result| result.structured_content.as_ref())
  2029. .and_then(|value| value.get("echoed")),
  2030. Some(&json!("second"))
  2031. );
  2032. let log = fs::read_to_string(&log_path).expect("read log");
  2033. assert_eq!(
  2034. log.lines().collect::<Vec<_>>(),
  2035. vec![
  2036. "initialize",
  2037. "tools/list",
  2038. "tools/call",
  2039. "tools/call-disconnect",
  2040. "initialize",
  2041. "tools/call",
  2042. ]
  2043. );
  2044. manager.shutdown().await.expect("shutdown");
  2045. cleanup_script(&script_path);
  2046. });
  2047. }
  2048. #[test]
  2049. fn manager_records_unsupported_non_stdio_servers_without_panicking() {
  2050. let servers = BTreeMap::from([
  2051. (
  2052. "http".to_string(),
  2053. ScopedMcpServerConfig {
  2054. scope: ConfigSource::Local,
  2055. config: McpServerConfig::Http(McpRemoteServerConfig {
  2056. url: "https://example.test/mcp".to_string(),
  2057. headers: BTreeMap::new(),
  2058. headers_helper: None,
  2059. oauth: None,
  2060. }),
  2061. },
  2062. ),
  2063. (
  2064. "sdk".to_string(),
  2065. ScopedMcpServerConfig {
  2066. scope: ConfigSource::Local,
  2067. config: McpServerConfig::Sdk(McpSdkServerConfig {
  2068. name: "sdk-server".to_string(),
  2069. }),
  2070. },
  2071. ),
  2072. (
  2073. "ws".to_string(),
  2074. ScopedMcpServerConfig {
  2075. scope: ConfigSource::Local,
  2076. config: McpServerConfig::Ws(McpWebSocketServerConfig {
  2077. url: "wss://example.test/mcp".to_string(),
  2078. headers: BTreeMap::new(),
  2079. headers_helper: None,
  2080. }),
  2081. },
  2082. ),
  2083. ]);
  2084. let manager = McpServerManager::from_servers(&servers);
  2085. let unsupported = manager.unsupported_servers();
  2086. assert_eq!(unsupported.len(), 3);
  2087. assert_eq!(unsupported[0].server_name, "http");
  2088. assert_eq!(unsupported[1].server_name, "sdk");
  2089. assert_eq!(unsupported[2].server_name, "ws");
  2090. }
  2091. #[test]
  2092. fn manager_shutdown_terminates_spawned_children_and_is_idempotent() {
  2093. let runtime = Builder::new_current_thread()
  2094. .enable_all()
  2095. .build()
  2096. .expect("runtime");
  2097. runtime.block_on(async {
  2098. let script_path = write_manager_mcp_server_script();
  2099. let root = script_path.parent().expect("script parent");
  2100. let log_path = root.join("alpha.log");
  2101. let servers = BTreeMap::from([(
  2102. "alpha".to_string(),
  2103. manager_server_config(&script_path, "alpha", &log_path),
  2104. )]);
  2105. let mut manager = McpServerManager::from_servers(&servers);
  2106. manager.discover_tools().await.expect("discover tools");
  2107. manager.shutdown().await.expect("first shutdown");
  2108. manager.shutdown().await.expect("second shutdown");
  2109. cleanup_script(&script_path);
  2110. });
  2111. }
  2112. #[test]
  2113. fn manager_reuses_spawned_server_between_discovery_and_call() {
  2114. let runtime = Builder::new_current_thread()
  2115. .enable_all()
  2116. .build()
  2117. .expect("runtime");
  2118. runtime.block_on(async {
  2119. let script_path = write_manager_mcp_server_script();
  2120. let root = script_path.parent().expect("script parent");
  2121. let log_path = root.join("alpha.log");
  2122. let servers = BTreeMap::from([(
  2123. "alpha".to_string(),
  2124. manager_server_config(&script_path, "alpha", &log_path),
  2125. )]);
  2126. let mut manager = McpServerManager::from_servers(&servers);
  2127. manager.discover_tools().await.expect("discover tools");
  2128. let response = manager
  2129. .call_tool(
  2130. &mcp_tool_name("alpha", "echo"),
  2131. Some(json!({"text": "reuse"})),
  2132. )
  2133. .await
  2134. .expect("call tool");
  2135. assert_eq!(
  2136. response
  2137. .result
  2138. .as_ref()
  2139. .and_then(|result| result.structured_content.as_ref())
  2140. .and_then(|value| value.get("initializeCount")),
  2141. Some(&json!(1))
  2142. );
  2143. let log = fs::read_to_string(&log_path).expect("read log");
  2144. assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1);
  2145. assert_eq!(
  2146. log.lines().collect::<Vec<_>>(),
  2147. vec!["initialize", "tools/list", "tools/call"]
  2148. );
  2149. manager.shutdown().await.expect("shutdown");
  2150. cleanup_script(&script_path);
  2151. });
  2152. }
  2153. #[test]
  2154. fn manager_reports_unknown_qualified_tool_name() {
  2155. let runtime = Builder::new_current_thread()
  2156. .enable_all()
  2157. .build()
  2158. .expect("runtime");
  2159. runtime.block_on(async {
  2160. let script_path = write_manager_mcp_server_script();
  2161. let root = script_path.parent().expect("script parent");
  2162. let log_path = root.join("alpha.log");
  2163. let servers = BTreeMap::from([(
  2164. "alpha".to_string(),
  2165. manager_server_config(&script_path, "alpha", &log_path),
  2166. )]);
  2167. let mut manager = McpServerManager::from_servers(&servers);
  2168. let error = manager
  2169. .call_tool(
  2170. &mcp_tool_name("alpha", "missing"),
  2171. Some(json!({"text": "nope"})),
  2172. )
  2173. .await
  2174. .expect_err("unknown qualified tool should fail");
  2175. match error {
  2176. McpServerManagerError::UnknownTool { qualified_name } => {
  2177. assert_eq!(qualified_name, mcp_tool_name("alpha", "missing"));
  2178. }
  2179. other => panic!("expected unknown tool error, got {other:?}"),
  2180. }
  2181. cleanup_script(&script_path);
  2182. });
  2183. }
  2184. }