client_integration.rs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
  1. use std::collections::HashMap;
  2. use std::sync::Arc;
  3. use std::sync::{Mutex as StdMutex, OnceLock};
  4. use std::time::Duration;
  5. use api::{
  6. ApiClient, ApiError, AuthSource, ContentBlockDelta, ContentBlockDeltaEvent,
  7. ContentBlockStartEvent, InputContentBlock, InputMessage, MessageDeltaEvent, MessageRequest,
  8. OutputContentBlock, ProviderClient, StreamEvent, ToolChoice, ToolDefinition,
  9. };
  10. use serde_json::json;
  11. use telemetry::{ClientIdentity, MemoryTelemetrySink, SessionTracer, TelemetryEvent};
  12. use tokio::io::{AsyncReadExt, AsyncWriteExt};
  13. use tokio::net::TcpListener;
  14. use tokio::sync::Mutex;
  15. fn env_lock() -> std::sync::MutexGuard<'static, ()> {
  16. static LOCK: OnceLock<StdMutex<()>> = OnceLock::new();
  17. LOCK.get_or_init(|| StdMutex::new(()))
  18. .lock()
  19. .unwrap_or_else(std::sync::PoisonError::into_inner)
  20. }
  21. #[tokio::test]
  22. async fn send_message_posts_json_and_parses_response() {
  23. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  24. let body = concat!(
  25. "{",
  26. "\"id\":\"msg_test\",",
  27. "\"type\":\"message\",",
  28. "\"role\":\"assistant\",",
  29. "\"content\":[{\"type\":\"text\",\"text\":\"Hello from Claude\"}],",
  30. "\"model\":\"claude-3-7-sonnet-latest\",",
  31. "\"stop_reason\":\"end_turn\",",
  32. "\"stop_sequence\":null,",
  33. "\"usage\":{\"input_tokens\":12,\"output_tokens\":4},",
  34. "\"request_id\":\"req_body_123\"",
  35. "}"
  36. );
  37. let server = spawn_server(
  38. state.clone(),
  39. vec![http_response("200 OK", "application/json", body)],
  40. )
  41. .await;
  42. let client = ApiClient::new("test-key")
  43. .with_auth_token(Some("proxy-token".to_string()))
  44. .with_base_url(server.base_url());
  45. let response = client
  46. .send_message(&sample_request(false))
  47. .await
  48. .expect("request should succeed");
  49. assert_eq!(response.id, "msg_test");
  50. assert_eq!(response.total_tokens(), 16);
  51. assert_eq!(response.request_id.as_deref(), Some("req_body_123"));
  52. assert_eq!(response.usage.cache_creation_input_tokens, 0);
  53. assert_eq!(response.usage.cache_read_input_tokens, 0);
  54. assert_eq!(
  55. response.content,
  56. vec![OutputContentBlock::Text {
  57. text: "Hello from Claude".to_string(),
  58. }]
  59. );
  60. let captured = state.lock().await;
  61. let request = captured.first().expect("server should capture request");
  62. assert_eq!(request.method, "POST");
  63. assert_eq!(request.path, "/v1/messages");
  64. assert_eq!(
  65. request.headers.get("x-api-key").map(String::as_str),
  66. Some("test-key")
  67. );
  68. assert_eq!(
  69. request.headers.get("authorization").map(String::as_str),
  70. Some("Bearer proxy-token")
  71. );
  72. assert_eq!(
  73. request.headers.get("anthropic-version").map(String::as_str),
  74. Some("2023-06-01")
  75. );
  76. assert_eq!(
  77. request.headers.get("user-agent").map(String::as_str),
  78. Some("claude-code/0.1.0")
  79. );
  80. assert_eq!(
  81. request.headers.get("anthropic-beta").map(String::as_str),
  82. Some("claude-code-20250219,prompt-caching-scope-2026-01-05")
  83. );
  84. let body: serde_json::Value =
  85. serde_json::from_str(&request.body).expect("request body should be json");
  86. assert_eq!(
  87. body.get("model").and_then(serde_json::Value::as_str),
  88. Some("claude-3-7-sonnet-latest")
  89. );
  90. assert!(body.get("stream").is_none());
  91. assert_eq!(body["tools"][0]["name"], json!("get_weather"));
  92. assert_eq!(body["tool_choice"]["type"], json!("auto"));
  93. assert_eq!(
  94. body["betas"],
  95. json!(["claude-code-20250219", "prompt-caching-scope-2026-01-05"])
  96. );
  97. }
  98. #[tokio::test]
  99. async fn send_message_applies_request_profile_and_records_telemetry() {
  100. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  101. let server = spawn_server(
  102. state.clone(),
  103. vec![http_response_with_headers(
  104. "200 OK",
  105. "application/json",
  106. concat!(
  107. "{",
  108. "\"id\":\"msg_profile\",",
  109. "\"type\":\"message\",",
  110. "\"role\":\"assistant\",",
  111. "\"content\":[{\"type\":\"text\",\"text\":\"ok\"}],",
  112. "\"model\":\"claude-3-7-sonnet-latest\",",
  113. "\"stop_reason\":\"end_turn\",",
  114. "\"stop_sequence\":null,",
  115. "\"usage\":{\"input_tokens\":1,\"cache_creation_input_tokens\":2,\"cache_read_input_tokens\":3,\"output_tokens\":1}",
  116. "}"
  117. ),
  118. &[("request-id", "req_profile_123")],
  119. )],
  120. )
  121. .await;
  122. let sink = Arc::new(MemoryTelemetrySink::default());
  123. let client = AnthropicClient::new("test-key")
  124. .with_base_url(server.base_url())
  125. .with_client_identity(ClientIdentity::new("claude-code", "9.9.9").with_runtime("rust-cli"))
  126. .with_beta("tools-2026-04-01")
  127. .with_extra_body_param("metadata", json!({"source": "clawd-code"}))
  128. .with_session_tracer(SessionTracer::new("session-telemetry", sink.clone()));
  129. let response = client
  130. .send_message(&sample_request(false))
  131. .await
  132. .expect("request should succeed");
  133. assert_eq!(response.request_id.as_deref(), Some("req_profile_123"));
  134. let captured = state.lock().await;
  135. let request = captured.first().expect("server should capture request");
  136. assert_eq!(
  137. request.headers.get("anthropic-beta").map(String::as_str),
  138. Some("claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01")
  139. );
  140. assert_eq!(
  141. request.headers.get("user-agent").map(String::as_str),
  142. Some("claude-code/9.9.9")
  143. );
  144. let body: serde_json::Value =
  145. serde_json::from_str(&request.body).expect("request body should be json");
  146. assert_eq!(body["metadata"]["source"], json!("clawd-code"));
  147. assert_eq!(
  148. body["betas"],
  149. json!([
  150. "claude-code-20250219",
  151. "prompt-caching-scope-2026-01-05",
  152. "tools-2026-04-01"
  153. ])
  154. );
  155. let events = sink.events();
  156. assert_eq!(events.len(), 6);
  157. assert!(matches!(
  158. &events[0],
  159. TelemetryEvent::HttpRequestStarted {
  160. session_id,
  161. attempt: 1,
  162. method,
  163. path,
  164. ..
  165. } if session_id == "session-telemetry" && method == "POST" && path == "/v1/messages"
  166. ));
  167. assert!(matches!(
  168. &events[1],
  169. TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_started"
  170. ));
  171. assert!(matches!(
  172. &events[2],
  173. TelemetryEvent::HttpRequestSucceeded {
  174. request_id,
  175. status: 200,
  176. ..
  177. } if request_id.as_deref() == Some("req_profile_123")
  178. ));
  179. assert!(matches!(
  180. &events[3],
  181. TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded"
  182. ));
  183. assert!(matches!(
  184. &events[4],
  185. TelemetryEvent::Analytics(event)
  186. if event.namespace == "api"
  187. && event.action == "message_usage"
  188. && event.properties.get("request_id") == Some(&json!("req_profile_123"))
  189. && event.properties.get("total_tokens") == Some(&json!(7))
  190. && event.properties.get("estimated_cost_usd") == Some(&json!("$0.0001"))
  191. ));
  192. assert!(matches!(
  193. &events[5],
  194. TelemetryEvent::SessionTrace(trace) if trace.name == "analytics"
  195. ));
  196. }
  197. #[tokio::test]
  198. async fn send_message_parses_prompt_cache_token_usage_from_response() {
  199. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  200. let body = concat!(
  201. "{",
  202. "\"id\":\"msg_cache_tokens\",",
  203. "\"type\":\"message\",",
  204. "\"role\":\"assistant\",",
  205. "\"content\":[{\"type\":\"text\",\"text\":\"Cache tokens\"}],",
  206. "\"model\":\"claude-3-7-sonnet-latest\",",
  207. "\"stop_reason\":\"end_turn\",",
  208. "\"stop_sequence\":null,",
  209. "\"usage\":{\"input_tokens\":12,\"cache_creation_input_tokens\":321,\"cache_read_input_tokens\":654,\"output_tokens\":4}",
  210. "}"
  211. );
  212. let server = spawn_server(
  213. state,
  214. vec![http_response("200 OK", "application/json", body)],
  215. )
  216. .await;
  217. let client = AnthropicClient::new("test-key").with_base_url(server.base_url());
  218. let response = client
  219. .send_message(&sample_request(false))
  220. .await
  221. .expect("request should succeed");
  222. assert_eq!(response.usage.input_tokens, 12);
  223. assert_eq!(response.usage.cache_creation_input_tokens, 321);
  224. assert_eq!(response.usage.cache_read_input_tokens, 654);
  225. assert_eq!(response.usage.output_tokens, 4);
  226. }
  227. #[tokio::test]
  228. #[allow(clippy::await_holding_lock)]
  229. async fn stream_message_parses_sse_events_with_tool_use() {
  230. let _guard = env_lock();
  231. let temp_root = std::env::temp_dir().join(format!(
  232. "api-stream-cache-{}-{}",
  233. std::process::id(),
  234. std::time::SystemTime::now()
  235. .duration_since(std::time::UNIX_EPOCH)
  236. .expect("time")
  237. .as_nanos()
  238. ));
  239. std::env::set_var("CLAUDE_CONFIG_HOME", &temp_root);
  240. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  241. let sse = concat!(
  242. "event: message_start\n",
  243. "data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_stream\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":8,\"cache_creation_input_tokens\":13,\"cache_read_input_tokens\":21,\"output_tokens\":0}}}\n\n",
  244. "event: content_block_start\n",
  245. "data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"tool_use\",\"id\":\"toolu_123\",\"name\":\"get_weather\",\"input\":{}}}\n\n",
  246. "event: content_block_delta\n",
  247. "data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"city\\\":\\\"Paris\\\"}\"}}\n\n",
  248. "event: content_block_stop\n",
  249. "data: {\"type\":\"content_block_stop\",\"index\":0}\n\n",
  250. "event: message_delta\n",
  251. "data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"tool_use\",\"stop_sequence\":null},\"usage\":{\"input_tokens\":8,\"cache_creation_input_tokens\":34,\"cache_read_input_tokens\":55,\"output_tokens\":1}}\n\n",
  252. "event: message_stop\n",
  253. "data: {\"type\":\"message_stop\"}\n\n",
  254. "data: [DONE]\n\n"
  255. );
  256. let server = spawn_server(
  257. state.clone(),
  258. vec![http_response_with_headers(
  259. "200 OK",
  260. "text/event-stream",
  261. sse,
  262. &[("request-id", "req_stream_456")],
  263. )],
  264. )
  265. .await;
  266. let client = ApiClient::new("test-key")
  267. .with_auth_token(Some("proxy-token".to_string()))
  268. .with_base_url(server.base_url())
  269. .with_prompt_cache(PromptCache::new("stream-session"));
  270. let mut stream = client
  271. .stream_message(&sample_request(false))
  272. .await
  273. .expect("stream should start");
  274. assert_eq!(stream.request_id(), Some("req_stream_456"));
  275. let mut events = Vec::new();
  276. while let Some(event) = stream
  277. .next_event()
  278. .await
  279. .expect("stream event should parse")
  280. {
  281. events.push(event);
  282. }
  283. assert_eq!(events.len(), 6);
  284. assert!(matches!(events[0], StreamEvent::MessageStart(_)));
  285. assert!(matches!(
  286. events[1],
  287. StreamEvent::ContentBlockStart(ContentBlockStartEvent {
  288. content_block: OutputContentBlock::ToolUse { .. },
  289. ..
  290. })
  291. ));
  292. assert!(matches!(
  293. events[2],
  294. StreamEvent::ContentBlockDelta(ContentBlockDeltaEvent {
  295. delta: ContentBlockDelta::InputJsonDelta { .. },
  296. ..
  297. })
  298. ));
  299. assert!(matches!(events[3], StreamEvent::ContentBlockStop(_)));
  300. assert!(matches!(
  301. events[4],
  302. StreamEvent::MessageDelta(MessageDeltaEvent { .. })
  303. ));
  304. assert!(matches!(events[5], StreamEvent::MessageStop(_)));
  305. match &events[1] {
  306. StreamEvent::ContentBlockStart(ContentBlockStartEvent {
  307. content_block: OutputContentBlock::ToolUse { name, input, .. },
  308. ..
  309. }) => {
  310. assert_eq!(name, "get_weather");
  311. assert_eq!(input, &json!({}));
  312. }
  313. other => panic!("expected tool_use block, got {other:?}"),
  314. }
  315. let captured = state.lock().await;
  316. let request = captured.first().expect("server should capture request");
  317. assert!(request.body.contains("\"stream\":true"));
  318. let cache_stats = client
  319. .prompt_cache_stats()
  320. .expect("prompt cache stats should exist");
  321. assert_eq!(cache_stats.tracked_requests, 1);
  322. assert_eq!(cache_stats.last_cache_creation_input_tokens, Some(34));
  323. assert_eq!(cache_stats.last_cache_read_input_tokens, Some(55));
  324. assert_eq!(
  325. cache_stats.last_cache_source.as_deref(),
  326. Some("api-response")
  327. );
  328. std::fs::remove_dir_all(temp_root).expect("cleanup temp root");
  329. std::env::remove_var("CLAUDE_CONFIG_HOME");
  330. }
  331. #[tokio::test]
  332. async fn retries_retryable_failures_before_succeeding() {
  333. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  334. let server = spawn_server(
  335. state.clone(),
  336. vec![
  337. http_response(
  338. "429 Too Many Requests",
  339. "application/json",
  340. "{\"type\":\"error\",\"error\":{\"type\":\"rate_limit_error\",\"message\":\"slow down\"}}",
  341. ),
  342. http_response(
  343. "200 OK",
  344. "application/json",
  345. "{\"id\":\"msg_retry\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Recovered\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"output_tokens\":2}}",
  346. ),
  347. ],
  348. )
  349. .await;
  350. let client = ApiClient::new("test-key")
  351. .with_base_url(server.base_url())
  352. .with_retry_policy(2, Duration::from_millis(1), Duration::from_millis(2));
  353. let response = client
  354. .send_message(&sample_request(false))
  355. .await
  356. .expect("retry should eventually succeed");
  357. assert_eq!(response.total_tokens(), 5);
  358. assert_eq!(state.lock().await.len(), 2);
  359. }
  360. #[tokio::test]
  361. async fn provider_client_dispatches_anthropic_requests() {
  362. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  363. let server = spawn_server(
  364. state.clone(),
  365. vec![http_response(
  366. "200 OK",
  367. "application/json",
  368. "{\"id\":\"msg_provider\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Dispatched\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"output_tokens\":2}}",
  369. )],
  370. )
  371. .await;
  372. let client = ProviderClient::from_model_with_anthropic_auth(
  373. "claude-sonnet-4-6",
  374. Some(AuthSource::ApiKey("test-key".to_string())),
  375. )
  376. .expect("anthropic provider client should be constructed");
  377. let client = match client {
  378. ProviderClient::Anthropic(client) => {
  379. ProviderClient::Anthropic(client.with_base_url(server.base_url()))
  380. }
  381. other => panic!("expected anthropic provider, got {other:?}"),
  382. };
  383. let response = client
  384. .send_message(&sample_request(false))
  385. .await
  386. .expect("provider-dispatched request should succeed");
  387. assert_eq!(response.total_tokens(), 5);
  388. let captured = state.lock().await;
  389. let request = captured.first().expect("server should capture request");
  390. assert_eq!(request.path, "/v1/messages");
  391. assert_eq!(
  392. request.headers.get("x-api-key").map(String::as_str),
  393. Some("test-key")
  394. );
  395. }
  396. #[tokio::test]
  397. async fn surfaces_retry_exhaustion_for_persistent_retryable_errors() {
  398. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  399. let server = spawn_server(
  400. state.clone(),
  401. vec![
  402. http_response(
  403. "503 Service Unavailable",
  404. "application/json",
  405. "{\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"busy\"}}",
  406. ),
  407. http_response(
  408. "503 Service Unavailable",
  409. "application/json",
  410. "{\"type\":\"error\",\"error\":{\"type\":\"overloaded_error\",\"message\":\"still busy\"}}",
  411. ),
  412. ],
  413. )
  414. .await;
  415. let client = ApiClient::new("test-key")
  416. .with_base_url(server.base_url())
  417. .with_retry_policy(1, Duration::from_millis(1), Duration::from_millis(2));
  418. let error = client
  419. .send_message(&sample_request(false))
  420. .await
  421. .expect_err("persistent 503 should fail");
  422. match error {
  423. ApiError::RetriesExhausted {
  424. attempts,
  425. last_error,
  426. } => {
  427. assert_eq!(attempts, 2);
  428. assert!(matches!(
  429. *last_error,
  430. ApiError::Api {
  431. status: reqwest::StatusCode::SERVICE_UNAVAILABLE,
  432. retryable: true,
  433. ..
  434. }
  435. ));
  436. }
  437. other => panic!("expected retries exhausted, got {other:?}"),
  438. }
  439. }
  440. #[tokio::test]
  441. #[allow(clippy::await_holding_lock)]
  442. async fn send_message_reuses_recent_completion_cache_entries() {
  443. let _guard = env_lock();
  444. let temp_root = std::env::temp_dir().join(format!(
  445. "api-prompt-cache-{}-{}",
  446. std::process::id(),
  447. std::time::SystemTime::now()
  448. .duration_since(std::time::UNIX_EPOCH)
  449. .expect("time")
  450. .as_nanos()
  451. ));
  452. std::env::set_var("CLAUDE_CONFIG_HOME", &temp_root);
  453. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  454. let server = spawn_server(
  455. state.clone(),
  456. vec![http_response(
  457. "200 OK",
  458. "application/json",
  459. "{\"id\":\"msg_cached\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Cached once\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"cache_creation_input_tokens\":5,\"cache_read_input_tokens\":4000,\"output_tokens\":2}}",
  460. )],
  461. )
  462. .await;
  463. let client = AnthropicClient::new("test-key")
  464. .with_base_url(server.base_url())
  465. .with_prompt_cache(PromptCache::new("integration-session"));
  466. let first = client
  467. .send_message(&sample_request(false))
  468. .await
  469. .expect("first request should succeed");
  470. let second = client
  471. .send_message(&sample_request(false))
  472. .await
  473. .expect("second request should reuse cache");
  474. assert_eq!(first.content, second.content);
  475. assert_eq!(state.lock().await.len(), 1);
  476. let cache_stats = client
  477. .prompt_cache_stats()
  478. .expect("prompt cache stats should exist");
  479. assert_eq!(cache_stats.completion_cache_hits, 1);
  480. assert_eq!(cache_stats.completion_cache_misses, 1);
  481. assert_eq!(cache_stats.completion_cache_writes, 1);
  482. std::fs::remove_dir_all(temp_root).expect("cleanup temp root");
  483. std::env::remove_var("CLAUDE_CONFIG_HOME");
  484. }
  485. #[tokio::test]
  486. #[allow(clippy::await_holding_lock)]
  487. async fn send_message_tracks_unexpected_prompt_cache_breaks() {
  488. let _guard = env_lock();
  489. let temp_root = std::env::temp_dir().join(format!(
  490. "api-prompt-break-{}-{}",
  491. std::process::id(),
  492. std::time::SystemTime::now()
  493. .duration_since(std::time::UNIX_EPOCH)
  494. .expect("time")
  495. .as_nanos()
  496. ));
  497. std::env::set_var("CLAUDE_CONFIG_HOME", &temp_root);
  498. let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
  499. let server = spawn_server(
  500. state,
  501. vec![
  502. http_response(
  503. "200 OK",
  504. "application/json",
  505. "{\"id\":\"msg_one\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"One\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"cache_creation_input_tokens\":5,\"cache_read_input_tokens\":6000,\"output_tokens\":2}}",
  506. ),
  507. http_response(
  508. "200 OK",
  509. "application/json",
  510. "{\"id\":\"msg_two\",\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Two\"}],\"model\":\"claude-3-7-sonnet-latest\",\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":3,\"cache_creation_input_tokens\":0,\"cache_read_input_tokens\":1000,\"output_tokens\":2}}",
  511. ),
  512. ],
  513. )
  514. .await;
  515. let request = sample_request(false);
  516. let client = AnthropicClient::new("test-key")
  517. .with_base_url(server.base_url())
  518. .with_prompt_cache(PromptCache::with_config(api::PromptCacheConfig {
  519. session_id: "break-session".to_string(),
  520. completion_ttl: Duration::from_secs(0),
  521. ..api::PromptCacheConfig::default()
  522. }));
  523. client
  524. .send_message(&request)
  525. .await
  526. .expect("first response should succeed");
  527. client
  528. .send_message(&request)
  529. .await
  530. .expect("second response should succeed");
  531. let cache_stats = client
  532. .prompt_cache_stats()
  533. .expect("prompt cache stats should exist");
  534. assert_eq!(cache_stats.unexpected_cache_breaks, 1);
  535. assert_eq!(
  536. cache_stats.last_break_reason.as_deref(),
  537. Some("cache read tokens dropped while prompt fingerprint remained stable")
  538. );
  539. std::fs::remove_dir_all(temp_root).expect("cleanup temp root");
  540. std::env::remove_var("CLAUDE_CONFIG_HOME");
  541. }
  542. #[tokio::test]
  543. #[ignore = "requires ANTHROPIC_API_KEY and network access"]
  544. async fn live_stream_smoke_test() {
  545. let client = ApiClient::from_env().expect("ANTHROPIC_API_KEY must be set");
  546. let mut stream = client
  547. .stream_message(&MessageRequest {
  548. model: std::env::var("ANTHROPIC_MODEL")
  549. .unwrap_or_else(|_| "claude-3-7-sonnet-latest".to_string()),
  550. max_tokens: 32,
  551. messages: vec![InputMessage::user_text(
  552. "Reply with exactly: hello from rust",
  553. )],
  554. system: None,
  555. tools: None,
  556. tool_choice: None,
  557. stream: false,
  558. })
  559. .await
  560. .expect("live stream should start");
  561. while let Some(_event) = stream
  562. .next_event()
  563. .await
  564. .expect("live stream should yield events")
  565. {}
  566. }
  567. #[derive(Debug, Clone, PartialEq, Eq)]
  568. struct CapturedRequest {
  569. method: String,
  570. path: String,
  571. headers: HashMap<String, String>,
  572. body: String,
  573. }
  574. struct TestServer {
  575. base_url: String,
  576. join_handle: tokio::task::JoinHandle<()>,
  577. }
  578. impl TestServer {
  579. fn base_url(&self) -> String {
  580. self.base_url.clone()
  581. }
  582. }
  583. impl Drop for TestServer {
  584. fn drop(&mut self) {
  585. self.join_handle.abort();
  586. }
  587. }
  588. async fn spawn_server(
  589. state: Arc<Mutex<Vec<CapturedRequest>>>,
  590. responses: Vec<String>,
  591. ) -> TestServer {
  592. let listener = TcpListener::bind("127.0.0.1:0")
  593. .await
  594. .expect("listener should bind");
  595. let address = listener
  596. .local_addr()
  597. .expect("listener should have local addr");
  598. let join_handle = tokio::spawn(async move {
  599. for response in responses {
  600. let (mut socket, _) = listener.accept().await.expect("server should accept");
  601. let mut buffer = Vec::new();
  602. let mut header_end = None;
  603. loop {
  604. let mut chunk = [0_u8; 1024];
  605. let read = socket
  606. .read(&mut chunk)
  607. .await
  608. .expect("request read should succeed");
  609. if read == 0 {
  610. break;
  611. }
  612. buffer.extend_from_slice(&chunk[..read]);
  613. if let Some(position) = find_header_end(&buffer) {
  614. header_end = Some(position);
  615. break;
  616. }
  617. }
  618. let header_end = header_end.expect("request should include headers");
  619. let (header_bytes, remaining) = buffer.split_at(header_end);
  620. let header_text =
  621. String::from_utf8(header_bytes.to_vec()).expect("headers should be utf8");
  622. let mut lines = header_text.split("\r\n");
  623. let request_line = lines.next().expect("request line should exist");
  624. let mut parts = request_line.split_whitespace();
  625. let method = parts.next().expect("method should exist").to_string();
  626. let path = parts.next().expect("path should exist").to_string();
  627. let mut headers = HashMap::new();
  628. let mut content_length = 0_usize;
  629. for line in lines {
  630. if line.is_empty() {
  631. continue;
  632. }
  633. let (name, value) = line.split_once(':').expect("header should have colon");
  634. let value = value.trim().to_string();
  635. if name.eq_ignore_ascii_case("content-length") {
  636. content_length = value.parse().expect("content length should parse");
  637. }
  638. headers.insert(name.to_ascii_lowercase(), value);
  639. }
  640. let mut body = remaining[4..].to_vec();
  641. while body.len() < content_length {
  642. let mut chunk = vec![0_u8; content_length - body.len()];
  643. let read = socket
  644. .read(&mut chunk)
  645. .await
  646. .expect("body read should succeed");
  647. if read == 0 {
  648. break;
  649. }
  650. body.extend_from_slice(&chunk[..read]);
  651. }
  652. state.lock().await.push(CapturedRequest {
  653. method,
  654. path,
  655. headers,
  656. body: String::from_utf8(body).expect("body should be utf8"),
  657. });
  658. socket
  659. .write_all(response.as_bytes())
  660. .await
  661. .expect("response write should succeed");
  662. }
  663. });
  664. TestServer {
  665. base_url: format!("http://{address}"),
  666. join_handle,
  667. }
  668. }
  669. fn find_header_end(bytes: &[u8]) -> Option<usize> {
  670. bytes.windows(4).position(|window| window == b"\r\n\r\n")
  671. }
  672. fn http_response(status: &str, content_type: &str, body: &str) -> String {
  673. http_response_with_headers(status, content_type, body, &[])
  674. }
  675. fn http_response_with_headers(
  676. status: &str,
  677. content_type: &str,
  678. body: &str,
  679. headers: &[(&str, &str)],
  680. ) -> String {
  681. let mut extra_headers = String::new();
  682. for (name, value) in headers {
  683. use std::fmt::Write as _;
  684. write!(&mut extra_headers, "{name}: {value}\r\n").expect("header write should succeed");
  685. }
  686. format!(
  687. "HTTP/1.1 {status}\r\ncontent-type: {content_type}\r\n{extra_headers}content-length: {}\r\nconnection: close\r\n\r\n{body}",
  688. body.len()
  689. )
  690. }
  691. fn sample_request(stream: bool) -> MessageRequest {
  692. MessageRequest {
  693. model: "claude-3-7-sonnet-latest".to_string(),
  694. max_tokens: 64,
  695. messages: vec![InputMessage {
  696. role: "user".to_string(),
  697. content: vec![
  698. InputContentBlock::Text {
  699. text: "Say hello".to_string(),
  700. },
  701. InputContentBlock::ToolResult {
  702. tool_use_id: "toolu_prev".to_string(),
  703. content: vec![api::ToolResultContentBlock::Json {
  704. value: json!({"forecast": "sunny"}),
  705. }],
  706. is_error: false,
  707. },
  708. ],
  709. }],
  710. system: Some("Use tools when needed".to_string()),
  711. tools: Some(vec![ToolDefinition {
  712. name: "get_weather".to_string(),
  713. description: Some("Fetches the weather".to_string()),
  714. input_schema: json!({
  715. "type": "object",
  716. "properties": {"city": {"type": "string"}},
  717. "required": ["city"]
  718. }),
  719. }]),
  720. tool_choice: Some(ToolChoice::Auto),
  721. stream,
  722. }
  723. }