lib.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. use std::fmt::{Debug, Formatter};
  2. use std::fs::{File, OpenOptions};
  3. use std::io::Write;
  4. use std::path::{Path, PathBuf};
  5. use std::sync::atomic::{AtomicU64, Ordering};
  6. use std::sync::{Arc, Mutex};
  7. use std::time::{SystemTime, UNIX_EPOCH};
  8. use serde::{Deserialize, Serialize};
  9. use serde_json::{Map, Value};
  10. pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01";
  11. pub const DEFAULT_APP_NAME: &str = "clawd-code";
  12. pub const DEFAULT_RUNTIME: &str = "rust";
  13. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
  14. pub struct ClientIdentity {
  15. pub app_name: String,
  16. pub app_version: String,
  17. pub runtime: String,
  18. }
  19. impl ClientIdentity {
  20. #[must_use]
  21. pub fn new(app_name: impl Into<String>, app_version: impl Into<String>) -> Self {
  22. Self {
  23. app_name: app_name.into(),
  24. app_version: app_version.into(),
  25. runtime: DEFAULT_RUNTIME.to_string(),
  26. }
  27. }
  28. #[must_use]
  29. pub fn with_runtime(mut self, runtime: impl Into<String>) -> Self {
  30. self.runtime = runtime.into();
  31. self
  32. }
  33. #[must_use]
  34. pub fn user_agent(&self) -> String {
  35. format!("{}/{} ({})", self.app_name, self.app_version, self.runtime)
  36. }
  37. }
  38. impl Default for ClientIdentity {
  39. fn default() -> Self {
  40. Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION"))
  41. }
  42. }
  43. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  44. pub struct AnthropicRequestProfile {
  45. pub anthropic_version: String,
  46. pub client_identity: ClientIdentity,
  47. #[serde(default, skip_serializing_if = "Vec::is_empty")]
  48. pub betas: Vec<String>,
  49. #[serde(default, skip_serializing_if = "Map::is_empty")]
  50. pub extra_body: Map<String, Value>,
  51. }
  52. impl AnthropicRequestProfile {
  53. #[must_use]
  54. pub fn new(client_identity: ClientIdentity) -> Self {
  55. Self {
  56. anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(),
  57. client_identity,
  58. betas: Vec::new(),
  59. extra_body: Map::new(),
  60. }
  61. }
  62. #[must_use]
  63. pub fn with_beta(mut self, beta: impl Into<String>) -> Self {
  64. let beta = beta.into();
  65. if !self.betas.contains(&beta) {
  66. self.betas.push(beta);
  67. }
  68. self
  69. }
  70. #[must_use]
  71. pub fn with_extra_body(mut self, key: impl Into<String>, value: Value) -> Self {
  72. self.extra_body.insert(key.into(), value);
  73. self
  74. }
  75. #[must_use]
  76. pub fn header_pairs(&self) -> Vec<(String, String)> {
  77. let mut headers = vec![
  78. (
  79. "anthropic-version".to_string(),
  80. self.anthropic_version.clone(),
  81. ),
  82. ("user-agent".to_string(), self.client_identity.user_agent()),
  83. ];
  84. if !self.betas.is_empty() {
  85. headers.push(("anthropic-beta".to_string(), self.betas.join(",")));
  86. }
  87. headers
  88. }
  89. pub fn render_json_body<T: Serialize>(&self, request: &T) -> Result<Value, serde_json::Error> {
  90. let mut body = serde_json::to_value(request)?;
  91. let object = body.as_object_mut().ok_or_else(|| {
  92. serde_json::Error::io(std::io::Error::new(
  93. std::io::ErrorKind::InvalidData,
  94. "request body must serialize to a JSON object",
  95. ))
  96. })?;
  97. for (key, value) in &self.extra_body {
  98. object.insert(key.clone(), value.clone());
  99. }
  100. Ok(body)
  101. }
  102. }
  103. impl Default for AnthropicRequestProfile {
  104. fn default() -> Self {
  105. Self::new(ClientIdentity::default())
  106. }
  107. }
  108. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  109. pub struct AnalyticsEvent {
  110. pub namespace: String,
  111. pub action: String,
  112. #[serde(default, skip_serializing_if = "Map::is_empty")]
  113. pub properties: Map<String, Value>,
  114. }
  115. impl AnalyticsEvent {
  116. #[must_use]
  117. pub fn new(namespace: impl Into<String>, action: impl Into<String>) -> Self {
  118. Self {
  119. namespace: namespace.into(),
  120. action: action.into(),
  121. properties: Map::new(),
  122. }
  123. }
  124. #[must_use]
  125. pub fn with_property(mut self, key: impl Into<String>, value: Value) -> Self {
  126. self.properties.insert(key.into(), value);
  127. self
  128. }
  129. }
  130. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  131. pub struct SessionTraceRecord {
  132. pub session_id: String,
  133. pub sequence: u64,
  134. pub name: String,
  135. pub timestamp_ms: u64,
  136. #[serde(default, skip_serializing_if = "Map::is_empty")]
  137. pub attributes: Map<String, Value>,
  138. }
  139. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
  140. #[serde(tag = "type", rename_all = "snake_case")]
  141. pub enum TelemetryEvent {
  142. HttpRequestStarted {
  143. session_id: String,
  144. attempt: u32,
  145. method: String,
  146. path: String,
  147. #[serde(default, skip_serializing_if = "Map::is_empty")]
  148. attributes: Map<String, Value>,
  149. },
  150. HttpRequestSucceeded {
  151. session_id: String,
  152. attempt: u32,
  153. method: String,
  154. path: String,
  155. status: u16,
  156. #[serde(default, skip_serializing_if = "Option::is_none")]
  157. request_id: Option<String>,
  158. #[serde(default, skip_serializing_if = "Map::is_empty")]
  159. attributes: Map<String, Value>,
  160. },
  161. HttpRequestFailed {
  162. session_id: String,
  163. attempt: u32,
  164. method: String,
  165. path: String,
  166. error: String,
  167. retryable: bool,
  168. #[serde(default, skip_serializing_if = "Map::is_empty")]
  169. attributes: Map<String, Value>,
  170. },
  171. Analytics(AnalyticsEvent),
  172. SessionTrace(SessionTraceRecord),
  173. }
  174. pub trait TelemetrySink: Send + Sync {
  175. fn record(&self, event: TelemetryEvent);
  176. }
  177. #[derive(Default)]
  178. pub struct MemoryTelemetrySink {
  179. events: Mutex<Vec<TelemetryEvent>>,
  180. }
  181. impl MemoryTelemetrySink {
  182. #[must_use]
  183. pub fn events(&self) -> Vec<TelemetryEvent> {
  184. self.events
  185. .lock()
  186. .unwrap_or_else(std::sync::PoisonError::into_inner)
  187. .clone()
  188. }
  189. }
  190. impl TelemetrySink for MemoryTelemetrySink {
  191. fn record(&self, event: TelemetryEvent) {
  192. self.events
  193. .lock()
  194. .unwrap_or_else(std::sync::PoisonError::into_inner)
  195. .push(event);
  196. }
  197. }
  198. pub struct JsonlTelemetrySink {
  199. path: PathBuf,
  200. file: Mutex<File>,
  201. }
  202. impl Debug for JsonlTelemetrySink {
  203. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  204. f.debug_struct("JsonlTelemetrySink")
  205. .field("path", &self.path)
  206. .finish_non_exhaustive()
  207. }
  208. }
  209. impl JsonlTelemetrySink {
  210. pub fn new(path: impl AsRef<Path>) -> Result<Self, std::io::Error> {
  211. let path = path.as_ref().to_path_buf();
  212. if let Some(parent) = path.parent() {
  213. std::fs::create_dir_all(parent)?;
  214. }
  215. let file = OpenOptions::new().create(true).append(true).open(&path)?;
  216. Ok(Self {
  217. path,
  218. file: Mutex::new(file),
  219. })
  220. }
  221. #[must_use]
  222. pub fn path(&self) -> &Path {
  223. &self.path
  224. }
  225. }
  226. impl TelemetrySink for JsonlTelemetrySink {
  227. fn record(&self, event: TelemetryEvent) {
  228. let Ok(line) = serde_json::to_string(&event) else {
  229. return;
  230. };
  231. let mut file = self
  232. .file
  233. .lock()
  234. .unwrap_or_else(std::sync::PoisonError::into_inner);
  235. let _ = writeln!(file, "{line}");
  236. let _ = file.flush();
  237. }
  238. }
  239. #[derive(Clone)]
  240. pub struct SessionTracer {
  241. session_id: String,
  242. sequence: Arc<AtomicU64>,
  243. sink: Arc<dyn TelemetrySink>,
  244. }
  245. impl Debug for SessionTracer {
  246. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  247. f.debug_struct("SessionTracer")
  248. .field("session_id", &self.session_id)
  249. .finish_non_exhaustive()
  250. }
  251. }
  252. impl SessionTracer {
  253. #[must_use]
  254. pub fn new(session_id: impl Into<String>, sink: Arc<dyn TelemetrySink>) -> Self {
  255. Self {
  256. session_id: session_id.into(),
  257. sequence: Arc::new(AtomicU64::new(0)),
  258. sink,
  259. }
  260. }
  261. #[must_use]
  262. pub fn session_id(&self) -> &str {
  263. &self.session_id
  264. }
  265. pub fn record(&self, name: impl Into<String>, attributes: Map<String, Value>) {
  266. let record = SessionTraceRecord {
  267. session_id: self.session_id.clone(),
  268. sequence: self.sequence.fetch_add(1, Ordering::Relaxed),
  269. name: name.into(),
  270. timestamp_ms: current_timestamp_ms(),
  271. attributes,
  272. };
  273. self.sink.record(TelemetryEvent::SessionTrace(record));
  274. }
  275. pub fn record_http_request_started(
  276. &self,
  277. attempt: u32,
  278. method: impl Into<String>,
  279. path: impl Into<String>,
  280. attributes: Map<String, Value>,
  281. ) {
  282. let method = method.into();
  283. let path = path.into();
  284. self.sink.record(TelemetryEvent::HttpRequestStarted {
  285. session_id: self.session_id.clone(),
  286. attempt,
  287. method: method.clone(),
  288. path: path.clone(),
  289. attributes: attributes.clone(),
  290. });
  291. self.record(
  292. "http_request_started",
  293. merge_trace_fields(method, path, attempt, attributes),
  294. );
  295. }
  296. pub fn record_http_request_succeeded(
  297. &self,
  298. attempt: u32,
  299. method: impl Into<String>,
  300. path: impl Into<String>,
  301. status: u16,
  302. request_id: Option<String>,
  303. attributes: Map<String, Value>,
  304. ) {
  305. let method = method.into();
  306. let path = path.into();
  307. self.sink.record(TelemetryEvent::HttpRequestSucceeded {
  308. session_id: self.session_id.clone(),
  309. attempt,
  310. method: method.clone(),
  311. path: path.clone(),
  312. status,
  313. request_id: request_id.clone(),
  314. attributes: attributes.clone(),
  315. });
  316. let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
  317. trace_attributes.insert("status".to_string(), Value::from(status));
  318. if let Some(request_id) = request_id {
  319. trace_attributes.insert("request_id".to_string(), Value::String(request_id));
  320. }
  321. self.record("http_request_succeeded", trace_attributes);
  322. }
  323. pub fn record_http_request_failed(
  324. &self,
  325. attempt: u32,
  326. method: impl Into<String>,
  327. path: impl Into<String>,
  328. error: impl Into<String>,
  329. retryable: bool,
  330. attributes: Map<String, Value>,
  331. ) {
  332. let method = method.into();
  333. let path = path.into();
  334. let error = error.into();
  335. self.sink.record(TelemetryEvent::HttpRequestFailed {
  336. session_id: self.session_id.clone(),
  337. attempt,
  338. method: method.clone(),
  339. path: path.clone(),
  340. error: error.clone(),
  341. retryable,
  342. attributes: attributes.clone(),
  343. });
  344. let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
  345. trace_attributes.insert("error".to_string(), Value::String(error));
  346. trace_attributes.insert("retryable".to_string(), Value::Bool(retryable));
  347. self.record("http_request_failed", trace_attributes);
  348. }
  349. pub fn record_analytics(&self, event: AnalyticsEvent) {
  350. let mut attributes = event.properties.clone();
  351. attributes.insert(
  352. "namespace".to_string(),
  353. Value::String(event.namespace.clone()),
  354. );
  355. attributes.insert("action".to_string(), Value::String(event.action.clone()));
  356. self.sink.record(TelemetryEvent::Analytics(event));
  357. self.record("analytics", attributes);
  358. }
  359. }
  360. fn merge_trace_fields(
  361. method: String,
  362. path: String,
  363. attempt: u32,
  364. mut attributes: Map<String, Value>,
  365. ) -> Map<String, Value> {
  366. attributes.insert("method".to_string(), Value::String(method));
  367. attributes.insert("path".to_string(), Value::String(path));
  368. attributes.insert("attempt".to_string(), Value::from(attempt));
  369. attributes
  370. }
  371. fn current_timestamp_ms() -> u64 {
  372. SystemTime::now()
  373. .duration_since(UNIX_EPOCH)
  374. .unwrap_or_default()
  375. .as_millis()
  376. .try_into()
  377. .unwrap_or(u64::MAX)
  378. }
  379. #[cfg(test)]
  380. mod tests {
  381. use super::*;
  382. #[test]
  383. fn request_profile_emits_headers_and_merges_body() {
  384. let profile = AnthropicRequestProfile::new(
  385. ClientIdentity::new("clawd-code", "1.2.3").with_runtime("rust-cli"),
  386. )
  387. .with_beta("tools-2026-04-01")
  388. .with_extra_body("metadata", serde_json::json!({"source": "test"}));
  389. assert_eq!(
  390. profile.header_pairs(),
  391. vec![
  392. (
  393. "anthropic-version".to_string(),
  394. DEFAULT_ANTHROPIC_VERSION.to_string()
  395. ),
  396. (
  397. "user-agent".to_string(),
  398. "clawd-code/1.2.3 (rust-cli)".to_string()
  399. ),
  400. ("anthropic-beta".to_string(), "tools-2026-04-01".to_string(),),
  401. ]
  402. );
  403. let body = profile
  404. .render_json_body(&serde_json::json!({"model": "claude-sonnet"}))
  405. .expect("body should serialize");
  406. assert_eq!(
  407. body["metadata"]["source"],
  408. Value::String("test".to_string())
  409. );
  410. }
  411. #[test]
  412. fn session_tracer_records_structured_events_and_trace_sequence() {
  413. let sink = Arc::new(MemoryTelemetrySink::default());
  414. let tracer = SessionTracer::new("session-123", sink.clone());
  415. tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new());
  416. tracer.record_analytics(
  417. AnalyticsEvent::new("cli", "prompt_sent")
  418. .with_property("model", Value::String("claude-opus".to_string())),
  419. );
  420. let events = sink.events();
  421. assert!(matches!(
  422. &events[0],
  423. TelemetryEvent::HttpRequestStarted {
  424. session_id,
  425. attempt: 1,
  426. method,
  427. path,
  428. ..
  429. } if session_id == "session-123" && method == "POST" && path == "/v1/messages"
  430. ));
  431. assert!(matches!(
  432. &events[1],
  433. TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. })
  434. if name == "http_request_started"
  435. ));
  436. assert!(matches!(&events[2], TelemetryEvent::Analytics(_)));
  437. assert!(matches!(
  438. &events[3],
  439. TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. })
  440. if name == "analytics"
  441. ));
  442. }
  443. #[test]
  444. fn jsonl_sink_persists_events() {
  445. let path = std::env::temp_dir().join(format!(
  446. "telemetry-jsonl-{}.log",
  447. current_timestamp_ms()
  448. ));
  449. let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
  450. sink.record(TelemetryEvent::Analytics(
  451. AnalyticsEvent::new("cli", "turn_completed")
  452. .with_property("ok", Value::Bool(true)),
  453. ));
  454. let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");
  455. assert!(contents.contains("\"type\":\"analytics\""));
  456. assert!(contents.contains("\"action\":\"turn_completed\""));
  457. let _ = std::fs::remove_file(path);
  458. }
  459. }