| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- use std::fmt::{Debug, Formatter};
- use std::fs::{File, OpenOptions};
- use std::io::Write;
- use std::path::{Path, PathBuf};
- use std::sync::atomic::{AtomicU64, Ordering};
- use std::sync::{Arc, Mutex};
- use std::time::{SystemTime, UNIX_EPOCH};
- use serde::{Deserialize, Serialize};
- use serde_json::{Map, Value};
- pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01";
- pub const DEFAULT_APP_NAME: &str = "clawd-code";
- pub const DEFAULT_RUNTIME: &str = "rust";
- #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
- pub struct ClientIdentity {
- pub app_name: String,
- pub app_version: String,
- pub runtime: String,
- }
- impl ClientIdentity {
- #[must_use]
- pub fn new(app_name: impl Into<String>, app_version: impl Into<String>) -> Self {
- Self {
- app_name: app_name.into(),
- app_version: app_version.into(),
- runtime: DEFAULT_RUNTIME.to_string(),
- }
- }
- #[must_use]
- pub fn with_runtime(mut self, runtime: impl Into<String>) -> Self {
- self.runtime = runtime.into();
- self
- }
- #[must_use]
- pub fn user_agent(&self) -> String {
- format!("{}/{} ({})", self.app_name, self.app_version, self.runtime)
- }
- }
- impl Default for ClientIdentity {
- fn default() -> Self {
- Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION"))
- }
- }
- #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
- pub struct AnthropicRequestProfile {
- pub anthropic_version: String,
- pub client_identity: ClientIdentity,
- #[serde(default, skip_serializing_if = "Vec::is_empty")]
- pub betas: Vec<String>,
- #[serde(default, skip_serializing_if = "Map::is_empty")]
- pub extra_body: Map<String, Value>,
- }
- impl AnthropicRequestProfile {
- #[must_use]
- pub fn new(client_identity: ClientIdentity) -> Self {
- Self {
- anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(),
- client_identity,
- betas: Vec::new(),
- extra_body: Map::new(),
- }
- }
- #[must_use]
- pub fn with_beta(mut self, beta: impl Into<String>) -> Self {
- let beta = beta.into();
- if !self.betas.contains(&beta) {
- self.betas.push(beta);
- }
- self
- }
- #[must_use]
- pub fn with_extra_body(mut self, key: impl Into<String>, value: Value) -> Self {
- self.extra_body.insert(key.into(), value);
- self
- }
- #[must_use]
- pub fn header_pairs(&self) -> Vec<(String, String)> {
- let mut headers = vec![
- (
- "anthropic-version".to_string(),
- self.anthropic_version.clone(),
- ),
- ("user-agent".to_string(), self.client_identity.user_agent()),
- ];
- if !self.betas.is_empty() {
- headers.push(("anthropic-beta".to_string(), self.betas.join(",")));
- }
- headers
- }
- pub fn render_json_body<T: Serialize>(&self, request: &T) -> Result<Value, serde_json::Error> {
- let mut body = serde_json::to_value(request)?;
- let object = body.as_object_mut().ok_or_else(|| {
- serde_json::Error::io(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- "request body must serialize to a JSON object",
- ))
- })?;
- for (key, value) in &self.extra_body {
- object.insert(key.clone(), value.clone());
- }
- Ok(body)
- }
- }
- impl Default for AnthropicRequestProfile {
- fn default() -> Self {
- Self::new(ClientIdentity::default())
- }
- }
- #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
- pub struct AnalyticsEvent {
- pub namespace: String,
- pub action: String,
- #[serde(default, skip_serializing_if = "Map::is_empty")]
- pub properties: Map<String, Value>,
- }
- impl AnalyticsEvent {
- #[must_use]
- pub fn new(namespace: impl Into<String>, action: impl Into<String>) -> Self {
- Self {
- namespace: namespace.into(),
- action: action.into(),
- properties: Map::new(),
- }
- }
- #[must_use]
- pub fn with_property(mut self, key: impl Into<String>, value: Value) -> Self {
- self.properties.insert(key.into(), value);
- self
- }
- }
- #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
- pub struct SessionTraceRecord {
- pub session_id: String,
- pub sequence: u64,
- pub name: String,
- pub timestamp_ms: u64,
- #[serde(default, skip_serializing_if = "Map::is_empty")]
- pub attributes: Map<String, Value>,
- }
- #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
- #[serde(tag = "type", rename_all = "snake_case")]
- pub enum TelemetryEvent {
- HttpRequestStarted {
- session_id: String,
- attempt: u32,
- method: String,
- path: String,
- #[serde(default, skip_serializing_if = "Map::is_empty")]
- attributes: Map<String, Value>,
- },
- HttpRequestSucceeded {
- session_id: String,
- attempt: u32,
- method: String,
- path: String,
- status: u16,
- #[serde(default, skip_serializing_if = "Option::is_none")]
- request_id: Option<String>,
- #[serde(default, skip_serializing_if = "Map::is_empty")]
- attributes: Map<String, Value>,
- },
- HttpRequestFailed {
- session_id: String,
- attempt: u32,
- method: String,
- path: String,
- error: String,
- retryable: bool,
- #[serde(default, skip_serializing_if = "Map::is_empty")]
- attributes: Map<String, Value>,
- },
- Analytics(AnalyticsEvent),
- SessionTrace(SessionTraceRecord),
- }
- pub trait TelemetrySink: Send + Sync {
- fn record(&self, event: TelemetryEvent);
- }
- #[derive(Default)]
- pub struct MemoryTelemetrySink {
- events: Mutex<Vec<TelemetryEvent>>,
- }
- impl MemoryTelemetrySink {
- #[must_use]
- pub fn events(&self) -> Vec<TelemetryEvent> {
- self.events
- .lock()
- .unwrap_or_else(std::sync::PoisonError::into_inner)
- .clone()
- }
- }
- impl TelemetrySink for MemoryTelemetrySink {
- fn record(&self, event: TelemetryEvent) {
- self.events
- .lock()
- .unwrap_or_else(std::sync::PoisonError::into_inner)
- .push(event);
- }
- }
- pub struct JsonlTelemetrySink {
- path: PathBuf,
- file: Mutex<File>,
- }
- impl Debug for JsonlTelemetrySink {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("JsonlTelemetrySink")
- .field("path", &self.path)
- .finish_non_exhaustive()
- }
- }
- impl JsonlTelemetrySink {
- pub fn new(path: impl AsRef<Path>) -> Result<Self, std::io::Error> {
- let path = path.as_ref().to_path_buf();
- if let Some(parent) = path.parent() {
- std::fs::create_dir_all(parent)?;
- }
- let file = OpenOptions::new().create(true).append(true).open(&path)?;
- Ok(Self {
- path,
- file: Mutex::new(file),
- })
- }
- #[must_use]
- pub fn path(&self) -> &Path {
- &self.path
- }
- }
- impl TelemetrySink for JsonlTelemetrySink {
- fn record(&self, event: TelemetryEvent) {
- let Ok(line) = serde_json::to_string(&event) else {
- return;
- };
- let mut file = self
- .file
- .lock()
- .unwrap_or_else(std::sync::PoisonError::into_inner);
- let _ = writeln!(file, "{line}");
- let _ = file.flush();
- }
- }
- #[derive(Clone)]
- pub struct SessionTracer {
- session_id: String,
- sequence: Arc<AtomicU64>,
- sink: Arc<dyn TelemetrySink>,
- }
- impl Debug for SessionTracer {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("SessionTracer")
- .field("session_id", &self.session_id)
- .finish_non_exhaustive()
- }
- }
- impl SessionTracer {
- #[must_use]
- pub fn new(session_id: impl Into<String>, sink: Arc<dyn TelemetrySink>) -> Self {
- Self {
- session_id: session_id.into(),
- sequence: Arc::new(AtomicU64::new(0)),
- sink,
- }
- }
- #[must_use]
- pub fn session_id(&self) -> &str {
- &self.session_id
- }
- pub fn record(&self, name: impl Into<String>, attributes: Map<String, Value>) {
- let record = SessionTraceRecord {
- session_id: self.session_id.clone(),
- sequence: self.sequence.fetch_add(1, Ordering::Relaxed),
- name: name.into(),
- timestamp_ms: current_timestamp_ms(),
- attributes,
- };
- self.sink.record(TelemetryEvent::SessionTrace(record));
- }
- pub fn record_http_request_started(
- &self,
- attempt: u32,
- method: impl Into<String>,
- path: impl Into<String>,
- attributes: Map<String, Value>,
- ) {
- let method = method.into();
- let path = path.into();
- self.sink.record(TelemetryEvent::HttpRequestStarted {
- session_id: self.session_id.clone(),
- attempt,
- method: method.clone(),
- path: path.clone(),
- attributes: attributes.clone(),
- });
- self.record(
- "http_request_started",
- merge_trace_fields(method, path, attempt, attributes),
- );
- }
- pub fn record_http_request_succeeded(
- &self,
- attempt: u32,
- method: impl Into<String>,
- path: impl Into<String>,
- status: u16,
- request_id: Option<String>,
- attributes: Map<String, Value>,
- ) {
- let method = method.into();
- let path = path.into();
- self.sink.record(TelemetryEvent::HttpRequestSucceeded {
- session_id: self.session_id.clone(),
- attempt,
- method: method.clone(),
- path: path.clone(),
- status,
- request_id: request_id.clone(),
- attributes: attributes.clone(),
- });
- let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
- trace_attributes.insert("status".to_string(), Value::from(status));
- if let Some(request_id) = request_id {
- trace_attributes.insert("request_id".to_string(), Value::String(request_id));
- }
- self.record("http_request_succeeded", trace_attributes);
- }
- pub fn record_http_request_failed(
- &self,
- attempt: u32,
- method: impl Into<String>,
- path: impl Into<String>,
- error: impl Into<String>,
- retryable: bool,
- attributes: Map<String, Value>,
- ) {
- let method = method.into();
- let path = path.into();
- let error = error.into();
- self.sink.record(TelemetryEvent::HttpRequestFailed {
- session_id: self.session_id.clone(),
- attempt,
- method: method.clone(),
- path: path.clone(),
- error: error.clone(),
- retryable,
- attributes: attributes.clone(),
- });
- let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
- trace_attributes.insert("error".to_string(), Value::String(error));
- trace_attributes.insert("retryable".to_string(), Value::Bool(retryable));
- self.record("http_request_failed", trace_attributes);
- }
- pub fn record_analytics(&self, event: AnalyticsEvent) {
- let mut attributes = event.properties.clone();
- attributes.insert(
- "namespace".to_string(),
- Value::String(event.namespace.clone()),
- );
- attributes.insert("action".to_string(), Value::String(event.action.clone()));
- self.sink.record(TelemetryEvent::Analytics(event));
- self.record("analytics", attributes);
- }
- }
- fn merge_trace_fields(
- method: String,
- path: String,
- attempt: u32,
- mut attributes: Map<String, Value>,
- ) -> Map<String, Value> {
- attributes.insert("method".to_string(), Value::String(method));
- attributes.insert("path".to_string(), Value::String(path));
- attributes.insert("attempt".to_string(), Value::from(attempt));
- attributes
- }
- fn current_timestamp_ms() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap_or_default()
- .as_millis()
- .try_into()
- .unwrap_or(u64::MAX)
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- #[test]
- fn request_profile_emits_headers_and_merges_body() {
- let profile = AnthropicRequestProfile::new(
- ClientIdentity::new("clawd-code", "1.2.3").with_runtime("rust-cli"),
- )
- .with_beta("tools-2026-04-01")
- .with_extra_body("metadata", serde_json::json!({"source": "test"}));
- assert_eq!(
- profile.header_pairs(),
- vec![
- (
- "anthropic-version".to_string(),
- DEFAULT_ANTHROPIC_VERSION.to_string()
- ),
- (
- "user-agent".to_string(),
- "clawd-code/1.2.3 (rust-cli)".to_string()
- ),
- ("anthropic-beta".to_string(), "tools-2026-04-01".to_string(),),
- ]
- );
- let body = profile
- .render_json_body(&serde_json::json!({"model": "claude-sonnet"}))
- .expect("body should serialize");
- assert_eq!(
- body["metadata"]["source"],
- Value::String("test".to_string())
- );
- }
- #[test]
- fn session_tracer_records_structured_events_and_trace_sequence() {
- let sink = Arc::new(MemoryTelemetrySink::default());
- let tracer = SessionTracer::new("session-123", sink.clone());
- tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new());
- tracer.record_analytics(
- AnalyticsEvent::new("cli", "prompt_sent")
- .with_property("model", Value::String("claude-opus".to_string())),
- );
- let events = sink.events();
- assert!(matches!(
- &events[0],
- TelemetryEvent::HttpRequestStarted {
- session_id,
- attempt: 1,
- method,
- path,
- ..
- } if session_id == "session-123" && method == "POST" && path == "/v1/messages"
- ));
- assert!(matches!(
- &events[1],
- TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. })
- if name == "http_request_started"
- ));
- assert!(matches!(&events[2], TelemetryEvent::Analytics(_)));
- assert!(matches!(
- &events[3],
- TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. })
- if name == "analytics"
- ));
- }
- #[test]
- fn jsonl_sink_persists_events() {
- let path = std::env::temp_dir().join(format!(
- "telemetry-jsonl-{}.log",
- current_timestamp_ms()
- ));
- let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
- sink.record(TelemetryEvent::Analytics(
- AnalyticsEvent::new("cli", "turn_completed")
- .with_property("ok", Value::Bool(true)),
- ));
- let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");
- assert!(contents.contains("\"type\":\"analytics\""));
- assert!(contents.contains("\"action\":\"turn_completed\""));
- let _ = std::fs::remove_file(path);
- }
- }
|