lib.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. use std::fmt::{Debug, Formatter};
  2. use std::fs::{File, OpenOptions};
  3. use std::io::Write;
  4. use std::path::{Path, PathBuf};
  5. use std::sync::atomic::{AtomicU64, Ordering};
  6. use std::sync::{Arc, Mutex};
  7. use std::time::{SystemTime, UNIX_EPOCH};
  8. use serde::{Deserialize, Serialize};
  9. use serde_json::{Map, Value};
  10. pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01";
  11. pub const DEFAULT_APP_NAME: &str = "claude-code";
  12. pub const DEFAULT_RUNTIME: &str = "rust";
  13. pub const DEFAULT_AGENTIC_BETA: &str = "claude-code-20250219";
  14. pub const DEFAULT_PROMPT_CACHING_SCOPE_BETA: &str = "prompt-caching-scope-2026-01-05";
  15. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  16. pub struct ClientIdentity {
  17. pub app_name: String,
  18. pub app_version: String,
  19. pub runtime: String,
  20. }
  21. impl ClientIdentity {
  22. #[must_use]
  23. pub fn new(app_name: impl Into<String>, app_version: impl Into<String>) -> Self {
  24. Self {
  25. app_name: app_name.into(),
  26. app_version: app_version.into(),
  27. runtime: DEFAULT_RUNTIME.to_string(),
  28. }
  29. }
  30. #[must_use]
  31. pub fn with_runtime(mut self, runtime: impl Into<String>) -> Self {
  32. self.runtime = runtime.into();
  33. self
  34. }
  35. #[must_use]
  36. pub fn user_agent(&self) -> String {
  37. format!("{}/{}", self.app_name, self.app_version)
  38. }
  39. }
  40. impl Default for ClientIdentity {
  41. fn default() -> Self {
  42. Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION"))
  43. }
  44. }
  45. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  46. pub struct AnthropicRequestProfile {
  47. pub anthropic_version: String,
  48. pub client_identity: ClientIdentity,
  49. #[serde(default, skip_serializing_if = "Vec::is_empty")]
  50. pub betas: Vec<String>,
  51. #[serde(default, skip_serializing_if = "Map::is_empty")]
  52. pub extra_body: Map<String, Value>,
  53. }
  54. impl AnthropicRequestProfile {
  55. #[must_use]
  56. pub fn new(client_identity: ClientIdentity) -> Self {
  57. Self {
  58. anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(),
  59. client_identity,
  60. betas: vec![
  61. DEFAULT_AGENTIC_BETA.to_string(),
  62. DEFAULT_PROMPT_CACHING_SCOPE_BETA.to_string(),
  63. ],
  64. extra_body: Map::new(),
  65. }
  66. }
  67. #[must_use]
  68. pub fn with_beta(mut self, beta: impl Into<String>) -> Self {
  69. let beta = beta.into();
  70. if !self.betas.contains(&beta) {
  71. self.betas.push(beta);
  72. }
  73. self
  74. }
  75. #[must_use]
  76. pub fn with_extra_body(mut self, key: impl Into<String>, value: Value) -> Self {
  77. self.extra_body.insert(key.into(), value);
  78. self
  79. }
  80. #[must_use]
  81. pub fn header_pairs(&self) -> Vec<(String, String)> {
  82. let mut headers = vec![
  83. (
  84. "anthropic-version".to_string(),
  85. self.anthropic_version.clone(),
  86. ),
  87. ("user-agent".to_string(), self.client_identity.user_agent()),
  88. ];
  89. if !self.betas.is_empty() {
  90. headers.push(("anthropic-beta".to_string(), self.betas.join(",")));
  91. }
  92. headers
  93. }
  94. pub fn render_json_body<T: Serialize>(&self, request: &T) -> Result<Value, serde_json::Error> {
  95. let mut body = serde_json::to_value(request)?;
  96. let object = body.as_object_mut().ok_or_else(|| {
  97. serde_json::Error::io(std::io::Error::new(
  98. std::io::ErrorKind::InvalidData,
  99. "request body must serialize to a JSON object",
  100. ))
  101. })?;
  102. for (key, value) in &self.extra_body {
  103. object.insert(key.clone(), value.clone());
  104. }
  105. if !self.betas.is_empty() {
  106. object.insert(
  107. "betas".to_string(),
  108. Value::Array(self.betas.iter().cloned().map(Value::String).collect()),
  109. );
  110. }
  111. Ok(body)
  112. }
  113. }
  114. impl Default for AnthropicRequestProfile {
  115. fn default() -> Self {
  116. Self::new(ClientIdentity::default())
  117. }
  118. }
  119. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  120. pub struct AnalyticsEvent {
  121. pub namespace: String,
  122. pub action: String,
  123. #[serde(default, skip_serializing_if = "Map::is_empty")]
  124. pub properties: Map<String, Value>,
  125. }
  126. impl AnalyticsEvent {
  127. #[must_use]
  128. pub fn new(namespace: impl Into<String>, action: impl Into<String>) -> Self {
  129. Self {
  130. namespace: namespace.into(),
  131. action: action.into(),
  132. properties: Map::new(),
  133. }
  134. }
  135. #[must_use]
  136. pub fn with_property(mut self, key: impl Into<String>, value: Value) -> Self {
  137. self.properties.insert(key.into(), value);
  138. self
  139. }
  140. }
  141. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  142. pub struct SessionTraceRecord {
  143. pub session_id: String,
  144. pub sequence: u64,
  145. pub name: String,
  146. pub timestamp_ms: u64,
  147. #[serde(default, skip_serializing_if = "Map::is_empty")]
  148. pub attributes: Map<String, Value>,
  149. }
  150. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  151. #[serde(tag = "type", rename_all = "snake_case")]
  152. pub enum TelemetryEvent {
  153. HttpRequestStarted {
  154. session_id: String,
  155. attempt: u32,
  156. method: String,
  157. path: String,
  158. #[serde(default, skip_serializing_if = "Map::is_empty")]
  159. attributes: Map<String, Value>,
  160. },
  161. HttpRequestSucceeded {
  162. session_id: String,
  163. attempt: u32,
  164. method: String,
  165. path: String,
  166. status: u16,
  167. #[serde(default, skip_serializing_if = "Option::is_none")]
  168. request_id: Option<String>,
  169. #[serde(default, skip_serializing_if = "Map::is_empty")]
  170. attributes: Map<String, Value>,
  171. },
  172. HttpRequestFailed {
  173. session_id: String,
  174. attempt: u32,
  175. method: String,
  176. path: String,
  177. error: String,
  178. retryable: bool,
  179. #[serde(default, skip_serializing_if = "Map::is_empty")]
  180. attributes: Map<String, Value>,
  181. },
  182. Analytics(AnalyticsEvent),
  183. SessionTrace(SessionTraceRecord),
  184. }
  185. pub trait TelemetrySink: Send + Sync {
  186. fn record(&self, event: TelemetryEvent);
  187. }
  188. #[derive(Default)]
  189. pub struct MemoryTelemetrySink {
  190. events: Mutex<Vec<TelemetryEvent>>,
  191. }
  192. impl MemoryTelemetrySink {
  193. #[must_use]
  194. pub fn events(&self) -> Vec<TelemetryEvent> {
  195. self.events
  196. .lock()
  197. .unwrap_or_else(std::sync::PoisonError::into_inner)
  198. .clone()
  199. }
  200. }
  201. impl TelemetrySink for MemoryTelemetrySink {
  202. fn record(&self, event: TelemetryEvent) {
  203. self.events
  204. .lock()
  205. .unwrap_or_else(std::sync::PoisonError::into_inner)
  206. .push(event);
  207. }
  208. }
  209. pub struct JsonlTelemetrySink {
  210. path: PathBuf,
  211. file: Mutex<File>,
  212. }
  213. impl Debug for JsonlTelemetrySink {
  214. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  215. f.debug_struct("JsonlTelemetrySink")
  216. .field("path", &self.path)
  217. .finish_non_exhaustive()
  218. }
  219. }
  220. impl JsonlTelemetrySink {
  221. pub fn new(path: impl AsRef<Path>) -> Result<Self, std::io::Error> {
  222. let path = path.as_ref().to_path_buf();
  223. if let Some(parent) = path.parent() {
  224. std::fs::create_dir_all(parent)?;
  225. }
  226. let file = OpenOptions::new().create(true).append(true).open(&path)?;
  227. Ok(Self {
  228. path,
  229. file: Mutex::new(file),
  230. })
  231. }
  232. #[must_use]
  233. pub fn path(&self) -> &Path {
  234. &self.path
  235. }
  236. }
  237. impl TelemetrySink for JsonlTelemetrySink {
  238. fn record(&self, event: TelemetryEvent) {
  239. let Ok(line) = serde_json::to_string(&event) else {
  240. return;
  241. };
  242. let mut file = self
  243. .file
  244. .lock()
  245. .unwrap_or_else(std::sync::PoisonError::into_inner);
  246. let _ = writeln!(file, "{line}");
  247. let _ = file.flush();
  248. }
  249. }
  250. #[derive(Clone)]
  251. pub struct SessionTracer {
  252. session_id: String,
  253. sequence: Arc<AtomicU64>,
  254. sink: Arc<dyn TelemetrySink>,
  255. }
  256. impl Debug for SessionTracer {
  257. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  258. f.debug_struct("SessionTracer")
  259. .field("session_id", &self.session_id)
  260. .finish_non_exhaustive()
  261. }
  262. }
  263. impl SessionTracer {
  264. #[must_use]
  265. pub fn new(session_id: impl Into<String>, sink: Arc<dyn TelemetrySink>) -> Self {
  266. Self {
  267. session_id: session_id.into(),
  268. sequence: Arc::new(AtomicU64::new(0)),
  269. sink,
  270. }
  271. }
  272. #[must_use]
  273. pub fn session_id(&self) -> &str {
  274. &self.session_id
  275. }
  276. pub fn record(&self, name: impl Into<String>, attributes: Map<String, Value>) {
  277. let record = SessionTraceRecord {
  278. session_id: self.session_id.clone(),
  279. sequence: self.sequence.fetch_add(1, Ordering::Relaxed),
  280. name: name.into(),
  281. timestamp_ms: current_timestamp_ms(),
  282. attributes,
  283. };
  284. self.sink.record(TelemetryEvent::SessionTrace(record));
  285. }
  286. pub fn record_http_request_started(
  287. &self,
  288. attempt: u32,
  289. method: impl Into<String>,
  290. path: impl Into<String>,
  291. attributes: Map<String, Value>,
  292. ) {
  293. let method = method.into();
  294. let path = path.into();
  295. self.sink.record(TelemetryEvent::HttpRequestStarted {
  296. session_id: self.session_id.clone(),
  297. attempt,
  298. method: method.clone(),
  299. path: path.clone(),
  300. attributes: attributes.clone(),
  301. });
  302. self.record(
  303. "http_request_started",
  304. merge_trace_fields(method, path, attempt, attributes),
  305. );
  306. }
  307. pub fn record_http_request_succeeded(
  308. &self,
  309. attempt: u32,
  310. method: impl Into<String>,
  311. path: impl Into<String>,
  312. status: u16,
  313. request_id: Option<String>,
  314. attributes: Map<String, Value>,
  315. ) {
  316. let method = method.into();
  317. let path = path.into();
  318. self.sink.record(TelemetryEvent::HttpRequestSucceeded {
  319. session_id: self.session_id.clone(),
  320. attempt,
  321. method: method.clone(),
  322. path: path.clone(),
  323. status,
  324. request_id: request_id.clone(),
  325. attributes: attributes.clone(),
  326. });
  327. let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
  328. trace_attributes.insert("status".to_string(), Value::from(status));
  329. if let Some(request_id) = request_id {
  330. trace_attributes.insert("request_id".to_string(), Value::String(request_id));
  331. }
  332. self.record("http_request_succeeded", trace_attributes);
  333. }
  334. pub fn record_http_request_failed(
  335. &self,
  336. attempt: u32,
  337. method: impl Into<String>,
  338. path: impl Into<String>,
  339. error: impl Into<String>,
  340. retryable: bool,
  341. attributes: Map<String, Value>,
  342. ) {
  343. let method = method.into();
  344. let path = path.into();
  345. let error = error.into();
  346. self.sink.record(TelemetryEvent::HttpRequestFailed {
  347. session_id: self.session_id.clone(),
  348. attempt,
  349. method: method.clone(),
  350. path: path.clone(),
  351. error: error.clone(),
  352. retryable,
  353. attributes: attributes.clone(),
  354. });
  355. let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
  356. trace_attributes.insert("error".to_string(), Value::String(error));
  357. trace_attributes.insert("retryable".to_string(), Value::Bool(retryable));
  358. self.record("http_request_failed", trace_attributes);
  359. }
  360. pub fn record_analytics(&self, event: AnalyticsEvent) {
  361. let mut attributes = event.properties.clone();
  362. attributes.insert(
  363. "namespace".to_string(),
  364. Value::String(event.namespace.clone()),
  365. );
  366. attributes.insert("action".to_string(), Value::String(event.action.clone()));
  367. self.sink.record(TelemetryEvent::Analytics(event));
  368. self.record("analytics", attributes);
  369. }
  370. }
  371. fn merge_trace_fields(
  372. method: String,
  373. path: String,
  374. attempt: u32,
  375. mut attributes: Map<String, Value>,
  376. ) -> Map<String, Value> {
  377. attributes.insert("method".to_string(), Value::String(method));
  378. attributes.insert("path".to_string(), Value::String(path));
  379. attributes.insert("attempt".to_string(), Value::from(attempt));
  380. attributes
  381. }
  382. fn current_timestamp_ms() -> u64 {
  383. SystemTime::now()
  384. .duration_since(UNIX_EPOCH)
  385. .unwrap_or_default()
  386. .as_millis()
  387. .try_into()
  388. .unwrap_or(u64::MAX)
  389. }
  390. #[cfg(test)]
  391. mod tests {
  392. use super::*;
  393. #[test]
  394. fn request_profile_emits_headers_and_merges_body() {
  395. let profile = AnthropicRequestProfile::new(
  396. ClientIdentity::new("claude-code", "1.2.3").with_runtime("rust-cli"),
  397. )
  398. .with_beta("tools-2026-04-01")
  399. .with_extra_body("metadata", serde_json::json!({"source": "test"}));
  400. assert_eq!(
  401. profile.header_pairs(),
  402. vec![
  403. (
  404. "anthropic-version".to_string(),
  405. DEFAULT_ANTHROPIC_VERSION.to_string()
  406. ),
  407. ("user-agent".to_string(), "claude-code/1.2.3".to_string()),
  408. (
  409. "anthropic-beta".to_string(),
  410. "claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01"
  411. .to_string(),
  412. ),
  413. ]
  414. );
  415. let body = profile
  416. .render_json_body(&serde_json::json!({"model": "claude-sonnet"}))
  417. .expect("body should serialize");
  418. assert_eq!(
  419. body["metadata"]["source"],
  420. Value::String("test".to_string())
  421. );
  422. assert_eq!(
  423. body["betas"],
  424. serde_json::json!([
  425. "claude-code-20250219",
  426. "prompt-caching-scope-2026-01-05",
  427. "tools-2026-04-01"
  428. ])
  429. );
  430. }
  431. #[test]
  432. fn session_tracer_records_structured_events_and_trace_sequence() {
  433. let sink = Arc::new(MemoryTelemetrySink::default());
  434. let tracer = SessionTracer::new("session-123", sink.clone());
  435. tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new());
  436. tracer.record_analytics(
  437. AnalyticsEvent::new("cli", "prompt_sent")
  438. .with_property("model", Value::String("claude-opus".to_string())),
  439. );
  440. let events = sink.events();
  441. assert!(matches!(
  442. &events[0],
  443. TelemetryEvent::HttpRequestStarted {
  444. session_id,
  445. attempt: 1,
  446. method,
  447. path,
  448. ..
  449. } if session_id == "session-123" && method == "POST" && path == "/v1/messages"
  450. ));
  451. assert!(matches!(
  452. &events[1],
  453. TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. })
  454. if name == "http_request_started"
  455. ));
  456. assert!(matches!(&events[2], TelemetryEvent::Analytics(_)));
  457. assert!(matches!(
  458. &events[3],
  459. TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. })
  460. if name == "analytics"
  461. ));
  462. }
  463. #[test]
  464. fn jsonl_sink_persists_events() {
  465. let path =
  466. std::env::temp_dir().join(format!("telemetry-jsonl-{}.log", current_timestamp_ms()));
  467. let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
  468. sink.record(TelemetryEvent::Analytics(
  469. AnalyticsEvent::new("cli", "turn_completed").with_property("ok", Value::Bool(true)),
  470. ));
  471. let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");
  472. assert!(contents.contains("\"type\":\"analytics\""));
  473. assert!(contents.contains("\"action\":\"turn_completed\""));
  474. let _ = std::fs::remove_file(path);
  475. }
  476. }