ソースを参照

Merge remote-tracking branch 'origin/rcc/telemetry' into integration/dori-cleanroom

YeonGyu-Kim 2 ヶ月 前
コミット
164bd518a1

+ 10 - 0
rust/Cargo.lock

@@ -25,6 +25,7 @@ dependencies = [
  "runtime",
  "serde",
  "serde_json",
+ "telemetry",
  "tokio",
 ]
 
@@ -1107,6 +1108,7 @@ dependencies = [
  "serde",
  "serde_json",
  "sha2",
+ "telemetry",
  "tokio",
  "walkdir",
 ]
@@ -1440,6 +1442,14 @@ dependencies = [
  "yaml-rust",
 ]
 
+[[package]]
+name = "telemetry"
+version = "0.1.0"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "thiserror"
 version = "2.0.18"

+ 6 - 0
rust/crates/api/src/lib.rs

@@ -21,3 +21,9 @@ pub use types::{
     MessageResponse, MessageStartEvent, MessageStopEvent, OutputContentBlock, StreamEvent,
     ToolChoice, ToolDefinition, ToolResultContentBlock, Usage,
 };
+
+pub use telemetry::{
+    AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, JsonlTelemetrySink,
+    MemoryTelemetrySink, SessionTraceRecord, SessionTracer, TelemetryEvent, TelemetrySink,
+    DEFAULT_ANTHROPIC_VERSION,
+};

+ 68 - 1
rust/crates/api/src/types.rs

@@ -1,3 +1,4 @@
+use runtime::{pricing_for_model, TokenUsage, UsageCostEstimate};
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 
@@ -159,7 +160,29 @@ pub struct Usage {
 impl Usage {
     #[must_use]
     pub const fn total_tokens(&self) -> u32 {
-        self.input_tokens + self.output_tokens
+        self.input_tokens
+            + self.output_tokens
+            + self.cache_creation_input_tokens
+            + self.cache_read_input_tokens
+    }
+
+    #[must_use]
+    pub const fn token_usage(&self) -> TokenUsage {
+        TokenUsage {
+            input_tokens: self.input_tokens,
+            output_tokens: self.output_tokens,
+            cache_creation_input_tokens: self.cache_creation_input_tokens,
+            cache_read_input_tokens: self.cache_read_input_tokens,
+        }
+    }
+
+    #[must_use]
+    pub fn estimated_cost_usd(&self, model: &str) -> UsageCostEstimate {
+        let usage = self.token_usage();
+        pricing_for_model(model).map_or_else(
+            || usage.estimate_cost_usd(),
+            |pricing| usage.estimate_cost_usd_with_pricing(pricing),
+        )
     }
 }
 
@@ -221,3 +244,47 @@ pub enum StreamEvent {
     ContentBlockStop(ContentBlockStopEvent),
     MessageStop(MessageStopEvent),
 }
+
+#[cfg(test)]
+mod tests {
+    use runtime::format_usd;
+
+    use super::{MessageResponse, Usage};
+
+    #[test]
+    fn usage_total_tokens_includes_cache_tokens() {
+        let usage = Usage {
+            input_tokens: 10,
+            cache_creation_input_tokens: 2,
+            cache_read_input_tokens: 3,
+            output_tokens: 4,
+        };
+
+        assert_eq!(usage.total_tokens(), 19);
+        assert_eq!(usage.token_usage().total_tokens(), 19);
+    }
+
+    #[test]
+    fn message_response_estimates_cost_from_model_usage() {
+        let response = MessageResponse {
+            id: "msg_cost".to_string(),
+            kind: "message".to_string(),
+            role: "assistant".to_string(),
+            content: Vec::new(),
+            model: "claude-sonnet-4-20250514".to_string(),
+            stop_reason: Some("end_turn".to_string()),
+            stop_sequence: None,
+            usage: Usage {
+                input_tokens: 1_000_000,
+                cache_creation_input_tokens: 100_000,
+                cache_read_input_tokens: 200_000,
+                output_tokens: 500_000,
+            },
+            request_id: None,
+        };
+
+        let cost = response.usage.estimated_cost_usd(&response.model);
+        assert_eq!(format_usd(cost.total_cost_usd()), "$54.6750");
+        assert_eq!(response.total_tokens(), 1_800_000);
+    }
+}

+ 122 - 0
rust/crates/api/tests/client_integration.rs

@@ -8,6 +8,7 @@ use api::{
     OutputContentBlock, ProviderClient, StreamEvent, ToolChoice, ToolDefinition,
 };
 use serde_json::json;
+use telemetry::{ClientIdentity, MemoryTelemetrySink, SessionTracer, TelemetryEvent};
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio::net::TcpListener;
 use tokio::sync::Mutex;
@@ -64,6 +65,18 @@ async fn send_message_posts_json_and_parses_response() {
         request.headers.get("authorization").map(String::as_str),
         Some("Bearer proxy-token")
     );
+    assert_eq!(
+        request.headers.get("anthropic-version").map(String::as_str),
+        Some("2023-06-01")
+    );
+    assert_eq!(
+        request.headers.get("user-agent").map(String::as_str),
+        Some("claude-code/0.1.0")
+    );
+    assert_eq!(
+        request.headers.get("anthropic-beta").map(String::as_str),
+        Some("claude-code-20250219,prompt-caching-scope-2026-01-05")
+    );
     let body: serde_json::Value =
         serde_json::from_str(&request.body).expect("request body should be json");
     assert_eq!(
@@ -73,6 +86,115 @@ async fn send_message_posts_json_and_parses_response() {
     assert!(body.get("stream").is_none());
     assert_eq!(body["tools"][0]["name"], json!("get_weather"));
     assert_eq!(body["tool_choice"]["type"], json!("auto"));
+    assert_eq!(
+        body["betas"],
+        json!(["claude-code-20250219", "prompt-caching-scope-2026-01-05"])
+    );
+}
+
+#[tokio::test]
+async fn send_message_applies_request_profile_and_records_telemetry() {
+    let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
+    let server = spawn_server(
+        state.clone(),
+        vec![http_response_with_headers(
+            "200 OK",
+            "application/json",
+            concat!(
+                "{",
+                "\"id\":\"msg_profile\",",
+                "\"type\":\"message\",",
+                "\"role\":\"assistant\",",
+                "\"content\":[{\"type\":\"text\",\"text\":\"ok\"}],",
+                "\"model\":\"claude-3-7-sonnet-latest\",",
+                "\"stop_reason\":\"end_turn\",",
+                "\"stop_sequence\":null,",
+                "\"usage\":{\"input_tokens\":1,\"cache_creation_input_tokens\":2,\"cache_read_input_tokens\":3,\"output_tokens\":1}",
+                "}"
+            ),
+            &[("request-id", "req_profile_123")],
+        )],
+    )
+    .await;
+    let sink = Arc::new(MemoryTelemetrySink::default());
+
+    let client = AnthropicClient::new("test-key")
+        .with_base_url(server.base_url())
+        .with_client_identity(ClientIdentity::new("claude-code", "9.9.9").with_runtime("rust-cli"))
+        .with_beta("tools-2026-04-01")
+        .with_extra_body_param("metadata", json!({"source": "clawd-code"}))
+        .with_session_tracer(SessionTracer::new("session-telemetry", sink.clone()));
+
+    let response = client
+        .send_message(&sample_request(false))
+        .await
+        .expect("request should succeed");
+
+    assert_eq!(response.request_id.as_deref(), Some("req_profile_123"));
+
+    let captured = state.lock().await;
+    let request = captured.first().expect("server should capture request");
+    assert_eq!(
+        request.headers.get("anthropic-beta").map(String::as_str),
+        Some("claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01")
+    );
+    assert_eq!(
+        request.headers.get("user-agent").map(String::as_str),
+        Some("claude-code/9.9.9")
+    );
+    let body: serde_json::Value =
+        serde_json::from_str(&request.body).expect("request body should be json");
+    assert_eq!(body["metadata"]["source"], json!("clawd-code"));
+    assert_eq!(
+        body["betas"],
+        json!([
+            "claude-code-20250219",
+            "prompt-caching-scope-2026-01-05",
+            "tools-2026-04-01"
+        ])
+    );
+
+    let events = sink.events();
+    assert_eq!(events.len(), 6);
+    assert!(matches!(
+        &events[0],
+        TelemetryEvent::HttpRequestStarted {
+            session_id,
+            attempt: 1,
+            method,
+            path,
+            ..
+        } if session_id == "session-telemetry" && method == "POST" && path == "/v1/messages"
+    ));
+    assert!(matches!(
+        &events[1],
+        TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_started"
+    ));
+    assert!(matches!(
+        &events[2],
+        TelemetryEvent::HttpRequestSucceeded {
+            request_id,
+            status: 200,
+            ..
+        } if request_id.as_deref() == Some("req_profile_123")
+    ));
+    assert!(matches!(
+        &events[3],
+        TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded"
+    ));
+    assert!(matches!(
+        &events[4],
+        TelemetryEvent::Analytics(event)
+            if event.namespace == "api"
+                && event.action == "message_usage"
+                && event.properties.get("request_id") == Some(&json!("req_profile_123"))
+                && event.properties.get("total_tokens") == Some(&json!(7))
+                && event.properties.get("estimated_cost_usd") == Some(&json!("$0.0001"))
+    ));
+    assert!(matches!(
+        &events[5],
+        TelemetryEvent::SessionTrace(trace) if trace.name == "analytics"
+    ));
 }
 
 #[tokio::test]

+ 65 - 8
rust/crates/runtime/src/conversation.rs

@@ -1,6 +1,9 @@
 use std::collections::BTreeMap;
 use std::fmt::{Display, Formatter};
 
+use serde_json::{Map, Value};
+use telemetry::SessionTracer;
+
 use crate::compact::{
     compact_session, estimate_session_tokens, CompactionConfig, CompactionResult,
 };
@@ -132,7 +135,7 @@ where
             tool_executor,
             permission_policy,
             system_prompt,
-            RuntimeFeatureConfig::default(),
+            &RuntimeFeatureConfig::default(),
         )
     }
 
@@ -144,7 +147,7 @@ where
         tool_executor: T,
         permission_policy: PermissionPolicy,
         system_prompt: Vec<String>,
-        feature_config: RuntimeFeatureConfig,
+        feature_config: &RuntimeFeatureConfig,
     ) -> Self {
         let usage_tracker = UsageTracker::from_session(&session);
         Self {
@@ -266,6 +269,8 @@ where
         user_input: impl Into<String>,
         mut prompter: Option<&mut dyn PermissionPrompter>,
     ) -> Result<TurnSummary, RuntimeError> {
+        let user_input = user_input.into();
+        self.record_turn_started(&user_input);
         self.session
             .push_user_text(user_input.into())
             .map_err(|error| RuntimeError::new(error.to_string()))?;
@@ -277,17 +282,31 @@ where
         loop {
             iterations += 1;
             if iterations > self.max_iterations {
-                return Err(RuntimeError::new(
+                let error = RuntimeError::new(
                     "conversation loop exceeded the maximum number of iterations",
-                ));
+                );
+                self.record_turn_failed(iterations, &error);
+                return Err(error);
             }
 
             let request = ApiRequest {
                 system_prompt: self.system_prompt.clone(),
                 messages: self.session.messages.clone(),
             };
-            let events = self.api_client.stream(request)?;
-            let (assistant_message, usage) = build_assistant_message(events)?;
+            let events = match self.api_client.stream(request) {
+                Ok(events) => events,
+                Err(error) => {
+                    self.record_turn_failed(iterations, &error);
+                    return Err(error);
+                }
+            };
+            let (assistant_message, usage) = match build_assistant_message(events) {
+                Ok(result) => result,
+                Err(error) => {
+                    self.record_turn_failed(iterations, &error);
+                    return Err(error);
+                }
+            };
             if let Some(usage) = usage {
                 self.usage_tracker.record(usage);
             }
@@ -301,6 +320,11 @@ where
                     _ => None,
                 })
                 .collect::<Vec<_>>();
+            self.record_assistant_iteration(
+                iterations,
+                &assistant_message,
+                pending_tool_uses.len(),
+            );
 
             self.session
                 .push_message(assistant_message.clone())
@@ -720,6 +744,39 @@ mod tests {
         ));
     }
 
+    #[test]
+    fn records_runtime_session_trace_events() {
+        let sink = Arc::new(MemoryTelemetrySink::default());
+        let tracer = SessionTracer::new("session-runtime", sink.clone());
+        let mut runtime = ConversationRuntime::new(
+            Session::new(),
+            ScriptedApiClient { call_count: 0 },
+            StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())),
+            PermissionPolicy::new(PermissionMode::WorkspaceWrite),
+            vec!["system".to_string()],
+        )
+        .with_session_tracer(tracer);
+
+        runtime
+            .run_turn("what is 2 + 2?", Some(&mut PromptAllowOnce))
+            .expect("conversation loop should succeed");
+
+        let events = sink.events();
+        let trace_names = events
+            .iter()
+            .filter_map(|event| match event {
+                TelemetryEvent::SessionTrace(trace) => Some(trace.name.as_str()),
+                _ => None,
+            })
+            .collect::<Vec<_>>();
+
+        assert!(trace_names.contains(&"turn_started"));
+        assert!(trace_names.contains(&"assistant_iteration_completed"));
+        assert!(trace_names.contains(&"tool_execution_started"));
+        assert!(trace_names.contains(&"tool_execution_finished"));
+        assert!(trace_names.contains(&"turn_completed"));
+    }
+
     #[test]
     fn records_denied_tool_results_when_prompt_rejects() {
         struct RejectPrompter;
@@ -808,7 +865,7 @@ mod tests {
             }),
             PermissionPolicy::new(PermissionMode::DangerFullAccess),
             vec!["system".to_string()],
-            RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new(
+            &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new(
                 vec![shell_snippet("printf 'blocked by hook'; exit 2")],
                 Vec::new(),
                 Vec::new(),
@@ -875,7 +932,7 @@ mod tests {
             StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())),
             PermissionPolicy::new(PermissionMode::DangerFullAccess),
             vec!["system".to_string()],
-            RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new(
+            &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new(
                 vec![shell_snippet("printf 'pre hook ran'")],
                 vec![shell_snippet("printf 'post hook ran'")],
                 Vec::new(),

+ 9 - 0
rust/crates/runtime/src/hooks.rs

@@ -476,6 +476,15 @@ impl HookRunner {
     }
 }
 
+struct HookInvocation<'a> {
+    event: HookEvent,
+    tool_name: &'a str,
+    tool_input: &'a str,
+    tool_output: Option<&'a str>,
+    is_error: bool,
+    payload: &'a str,
+}
+
 enum HookCommandOutcome {
     Allow { parsed: ParsedHookOutput },
     Deny { parsed: ParsedHookOutput },

+ 14 - 2
rust/crates/runtime/src/mcp_stdio.rs

@@ -1144,8 +1144,20 @@ mod tests {
     }
 
     fn cleanup_script(script_path: &Path) {
-        fs::remove_file(script_path).expect("cleanup script");
-        fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
+        if let Err(error) = fs::remove_file(script_path) {
+            assert_eq!(
+                error.kind(),
+                std::io::ErrorKind::NotFound,
+                "cleanup script: {error}"
+            );
+        }
+        if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) {
+            assert_eq!(
+                error.kind(),
+                std::io::ErrorKind::NotFound,
+                "cleanup dir: {error}"
+            );
+        }
     }
 
     fn manager_server_config(

+ 17 - 14
rust/crates/rusty-claude-cli/src/main.rs

@@ -17,8 +17,9 @@ use std::time::{Duration, Instant, UNIX_EPOCH};
 
 use api::{
     resolve_startup_auth_source, AnthropicClient, AuthSource, ContentBlockDelta, InputContentBlock,
-    InputMessage, MessageRequest, MessageResponse, OutputContentBlock,
-    StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock,
+    InputMessage, JsonlTelemetrySink, MessageRequest, MessageResponse, OutputContentBlock,
+    SessionTracer, StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition,
+    ToolResultContentBlock,
 };
 
 use commands::{
@@ -51,6 +52,7 @@ fn max_tokens_for_model(model: &str) -> u32 {
 }
 const DEFAULT_DATE: &str = "2026-03-31";
 const DEFAULT_OAUTH_CALLBACK_PORT: u16 = 4545;
+const TELEMETRY_LOG_PATH_ENV: &str = "CLAW_TELEMETRY_LOG_PATH";
 const VERSION: &str = env!("CARGO_PKG_VERSION");
 const BUILD_TARGET: Option<&str> = option_env!("TARGET");
 const GIT_SHA: Option<&str> = option_env!("GIT_SHA");
@@ -1489,6 +1491,7 @@ impl LiveCli {
         let message_count = session.messages.len();
         self.runtime = build_runtime(
             session,
+            &self.session.id,
             model.clone(),
             self.system_prompt.clone(),
             true,
@@ -1533,6 +1536,7 @@ impl LiveCli {
         self.permission_mode = permission_mode_from_label(normalized);
         self.runtime = build_runtime(
             session,
+            &self.session.id,
             self.model.clone(),
             self.system_prompt.clone(),
             true,
@@ -1597,6 +1601,7 @@ impl LiveCli {
         let session_id = session.session_id.clone();
         self.runtime = build_runtime(
             session,
+            &handle.id,
             self.model.clone(),
             self.system_prompt.clone(),
             true,
@@ -1686,6 +1691,7 @@ impl LiveCli {
                 let session_id = session.session_id.clone();
                 self.runtime = build_runtime(
                     session,
+                    &handle.id,
                     self.model.clone(),
                     self.system_prompt.clone(),
                     true,
@@ -1785,6 +1791,7 @@ impl LiveCli {
         let skipped = removed == 0;
         self.runtime = build_runtime(
             result.compacted_session,
+            &self.session.id,
             self.model.clone(),
             self.system_prompt.clone(),
             true,
@@ -3127,6 +3134,7 @@ fn describe_tool_progress(name: &str, input: &str) -> String {
 #[allow(clippy::too_many_arguments)]
 fn build_runtime(
     session: Session,
+    session_id: &str,
     model: String,
     system_prompt: Vec<String>,
     enable_tools: bool,
@@ -3271,6 +3279,11 @@ impl AnthropicRuntimeClient {
             progress_reporter,
         })
     }
+
+    fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self {
+        self.client = self.client.with_session_tracer(session_tracer);
+        self
+    }
 }
 
 fn resolve_cli_auth_source() -> Result<AuthSource, Box<dyn std::error::Error>> {
@@ -3380,12 +3393,7 @@ impl ApiClient for AnthropicRuntimeClient {
                         }
                     }
                     ApiStreamEvent::MessageDelta(delta) => {
-                        events.push(AssistantEvent::Usage(TokenUsage {
-                            input_tokens: delta.usage.input_tokens,
-                            output_tokens: delta.usage.output_tokens,
-                            cache_creation_input_tokens: 0,
-                            cache_read_input_tokens: 0,
-                        }));
+                        events.push(AssistantEvent::Usage(delta.usage.token_usage()));
                     }
                     ApiStreamEvent::MessageStop(_) => {
                         saw_stop = true;
@@ -3977,12 +3985,7 @@ fn response_to_events(
         }
     }
 
-    events.push(AssistantEvent::Usage(TokenUsage {
-        input_tokens: response.usage.input_tokens,
-        output_tokens: response.usage.output_tokens,
-        cache_creation_input_tokens: response.usage.cache_creation_input_tokens,
-        cache_read_input_tokens: response.usage.cache_read_input_tokens,
-    }));
+    events.push(AssistantEvent::Usage(response.usage.token_usage()));
     events.push(AssistantEvent::MessageStop);
     Ok(events)
 }

+ 13 - 0
rust/crates/telemetry/Cargo.toml

@@ -0,0 +1,13 @@
+[package]
+name = "telemetry"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+publish.workspace = true
+
+[dependencies]
+serde = { version = "1", features = ["derive"] }
+serde_json = "1"
+
+[lints]
+workspace = true

+ 526 - 0
rust/crates/telemetry/src/lib.rs

@@ -0,0 +1,526 @@
+use std::fmt::{Debug, Formatter};
+use std::fs::{File, OpenOptions};
+use std::io::Write;
+use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+
+pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01";
+pub const DEFAULT_APP_NAME: &str = "claude-code";
+pub const DEFAULT_RUNTIME: &str = "rust";
+pub const DEFAULT_AGENTIC_BETA: &str = "claude-code-20250219";
+pub const DEFAULT_PROMPT_CACHING_SCOPE_BETA: &str = "prompt-caching-scope-2026-01-05";
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct ClientIdentity {
+    pub app_name: String,
+    pub app_version: String,
+    pub runtime: String,
+}
+
+impl ClientIdentity {
+    #[must_use]
+    pub fn new(app_name: impl Into<String>, app_version: impl Into<String>) -> Self {
+        Self {
+            app_name: app_name.into(),
+            app_version: app_version.into(),
+            runtime: DEFAULT_RUNTIME.to_string(),
+        }
+    }
+
+    #[must_use]
+    pub fn with_runtime(mut self, runtime: impl Into<String>) -> Self {
+        self.runtime = runtime.into();
+        self
+    }
+
+    #[must_use]
+    pub fn user_agent(&self) -> String {
+        format!("{}/{}", self.app_name, self.app_version)
+    }
+}
+
+impl Default for ClientIdentity {
+    fn default() -> Self {
+        Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION"))
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct AnthropicRequestProfile {
+    pub anthropic_version: String,
+    pub client_identity: ClientIdentity,
+    #[serde(default, skip_serializing_if = "Vec::is_empty")]
+    pub betas: Vec<String>,
+    #[serde(default, skip_serializing_if = "Map::is_empty")]
+    pub extra_body: Map<String, Value>,
+}
+
+impl AnthropicRequestProfile {
+    #[must_use]
+    pub fn new(client_identity: ClientIdentity) -> Self {
+        Self {
+            anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(),
+            client_identity,
+            betas: vec![
+                DEFAULT_AGENTIC_BETA.to_string(),
+                DEFAULT_PROMPT_CACHING_SCOPE_BETA.to_string(),
+            ],
+            extra_body: Map::new(),
+        }
+    }
+
+    #[must_use]
+    pub fn with_beta(mut self, beta: impl Into<String>) -> Self {
+        let beta = beta.into();
+        if !self.betas.contains(&beta) {
+            self.betas.push(beta);
+        }
+        self
+    }
+
+    #[must_use]
+    pub fn with_extra_body(mut self, key: impl Into<String>, value: Value) -> Self {
+        self.extra_body.insert(key.into(), value);
+        self
+    }
+
+    #[must_use]
+    pub fn header_pairs(&self) -> Vec<(String, String)> {
+        let mut headers = vec![
+            (
+                "anthropic-version".to_string(),
+                self.anthropic_version.clone(),
+            ),
+            ("user-agent".to_string(), self.client_identity.user_agent()),
+        ];
+        if !self.betas.is_empty() {
+            headers.push(("anthropic-beta".to_string(), self.betas.join(",")));
+        }
+        headers
+    }
+
+    pub fn render_json_body<T: Serialize>(&self, request: &T) -> Result<Value, serde_json::Error> {
+        let mut body = serde_json::to_value(request)?;
+        let object = body.as_object_mut().ok_or_else(|| {
+            serde_json::Error::io(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "request body must serialize to a JSON object",
+            ))
+        })?;
+        for (key, value) in &self.extra_body {
+            object.insert(key.clone(), value.clone());
+        }
+        if !self.betas.is_empty() {
+            object.insert(
+                "betas".to_string(),
+                Value::Array(self.betas.iter().cloned().map(Value::String).collect()),
+            );
+        }
+        Ok(body)
+    }
+}
+
+impl Default for AnthropicRequestProfile {
+    fn default() -> Self {
+        Self::new(ClientIdentity::default())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct AnalyticsEvent {
+    pub namespace: String,
+    pub action: String,
+    #[serde(default, skip_serializing_if = "Map::is_empty")]
+    pub properties: Map<String, Value>,
+}
+
+impl AnalyticsEvent {
+    #[must_use]
+    pub fn new(namespace: impl Into<String>, action: impl Into<String>) -> Self {
+        Self {
+            namespace: namespace.into(),
+            action: action.into(),
+            properties: Map::new(),
+        }
+    }
+
+    #[must_use]
+    pub fn with_property(mut self, key: impl Into<String>, value: Value) -> Self {
+        self.properties.insert(key.into(), value);
+        self
+    }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct SessionTraceRecord {
+    pub session_id: String,
+    pub sequence: u64,
+    pub name: String,
+    pub timestamp_ms: u64,
+    #[serde(default, skip_serializing_if = "Map::is_empty")]
+    pub attributes: Map<String, Value>,
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum TelemetryEvent {
+    HttpRequestStarted {
+        session_id: String,
+        attempt: u32,
+        method: String,
+        path: String,
+        #[serde(default, skip_serializing_if = "Map::is_empty")]
+        attributes: Map<String, Value>,
+    },
+    HttpRequestSucceeded {
+        session_id: String,
+        attempt: u32,
+        method: String,
+        path: String,
+        status: u16,
+        #[serde(default, skip_serializing_if = "Option::is_none")]
+        request_id: Option<String>,
+        #[serde(default, skip_serializing_if = "Map::is_empty")]
+        attributes: Map<String, Value>,
+    },
+    HttpRequestFailed {
+        session_id: String,
+        attempt: u32,
+        method: String,
+        path: String,
+        error: String,
+        retryable: bool,
+        #[serde(default, skip_serializing_if = "Map::is_empty")]
+        attributes: Map<String, Value>,
+    },
+    Analytics(AnalyticsEvent),
+    SessionTrace(SessionTraceRecord),
+}
+
+pub trait TelemetrySink: Send + Sync {
+    fn record(&self, event: TelemetryEvent);
+}
+
+#[derive(Default)]
+pub struct MemoryTelemetrySink {
+    events: Mutex<Vec<TelemetryEvent>>,
+}
+
+impl MemoryTelemetrySink {
+    #[must_use]
+    pub fn events(&self) -> Vec<TelemetryEvent> {
+        self.events
+            .lock()
+            .unwrap_or_else(std::sync::PoisonError::into_inner)
+            .clone()
+    }
+}
+
+impl TelemetrySink for MemoryTelemetrySink {
+    fn record(&self, event: TelemetryEvent) {
+        self.events
+            .lock()
+            .unwrap_or_else(std::sync::PoisonError::into_inner)
+            .push(event);
+    }
+}
+
+pub struct JsonlTelemetrySink {
+    path: PathBuf,
+    file: Mutex<File>,
+}
+
+impl Debug for JsonlTelemetrySink {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("JsonlTelemetrySink")
+            .field("path", &self.path)
+            .finish_non_exhaustive()
+    }
+}
+
+impl JsonlTelemetrySink {
+    pub fn new(path: impl AsRef<Path>) -> Result<Self, std::io::Error> {
+        let path = path.as_ref().to_path_buf();
+        if let Some(parent) = path.parent() {
+            std::fs::create_dir_all(parent)?;
+        }
+        let file = OpenOptions::new().create(true).append(true).open(&path)?;
+        Ok(Self {
+            path,
+            file: Mutex::new(file),
+        })
+    }
+
+    #[must_use]
+    pub fn path(&self) -> &Path {
+        &self.path
+    }
+}
+
+impl TelemetrySink for JsonlTelemetrySink {
+    fn record(&self, event: TelemetryEvent) {
+        let Ok(line) = serde_json::to_string(&event) else {
+            return;
+        };
+        let mut file = self
+            .file
+            .lock()
+            .unwrap_or_else(std::sync::PoisonError::into_inner);
+        let _ = writeln!(file, "{line}");
+        let _ = file.flush();
+    }
+}
+
+#[derive(Clone)]
+pub struct SessionTracer {
+    session_id: String,
+    sequence: Arc<AtomicU64>,
+    sink: Arc<dyn TelemetrySink>,
+}
+
+impl Debug for SessionTracer {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SessionTracer")
+            .field("session_id", &self.session_id)
+            .finish_non_exhaustive()
+    }
+}
+
+impl SessionTracer {
+    #[must_use]
+    pub fn new(session_id: impl Into<String>, sink: Arc<dyn TelemetrySink>) -> Self {
+        Self {
+            session_id: session_id.into(),
+            sequence: Arc::new(AtomicU64::new(0)),
+            sink,
+        }
+    }
+
+    #[must_use]
+    pub fn session_id(&self) -> &str {
+        &self.session_id
+    }
+
+    pub fn record(&self, name: impl Into<String>, attributes: Map<String, Value>) {
+        let record = SessionTraceRecord {
+            session_id: self.session_id.clone(),
+            sequence: self.sequence.fetch_add(1, Ordering::Relaxed),
+            name: name.into(),
+            timestamp_ms: current_timestamp_ms(),
+            attributes,
+        };
+        self.sink.record(TelemetryEvent::SessionTrace(record));
+    }
+
+    pub fn record_http_request_started(
+        &self,
+        attempt: u32,
+        method: impl Into<String>,
+        path: impl Into<String>,
+        attributes: Map<String, Value>,
+    ) {
+        let method = method.into();
+        let path = path.into();
+        self.sink.record(TelemetryEvent::HttpRequestStarted {
+            session_id: self.session_id.clone(),
+            attempt,
+            method: method.clone(),
+            path: path.clone(),
+            attributes: attributes.clone(),
+        });
+        self.record(
+            "http_request_started",
+            merge_trace_fields(method, path, attempt, attributes),
+        );
+    }
+
+    pub fn record_http_request_succeeded(
+        &self,
+        attempt: u32,
+        method: impl Into<String>,
+        path: impl Into<String>,
+        status: u16,
+        request_id: Option<String>,
+        attributes: Map<String, Value>,
+    ) {
+        let method = method.into();
+        let path = path.into();
+        self.sink.record(TelemetryEvent::HttpRequestSucceeded {
+            session_id: self.session_id.clone(),
+            attempt,
+            method: method.clone(),
+            path: path.clone(),
+            status,
+            request_id: request_id.clone(),
+            attributes: attributes.clone(),
+        });
+        let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
+        trace_attributes.insert("status".to_string(), Value::from(status));
+        if let Some(request_id) = request_id {
+            trace_attributes.insert("request_id".to_string(), Value::String(request_id));
+        }
+        self.record("http_request_succeeded", trace_attributes);
+    }
+
+    pub fn record_http_request_failed(
+        &self,
+        attempt: u32,
+        method: impl Into<String>,
+        path: impl Into<String>,
+        error: impl Into<String>,
+        retryable: bool,
+        attributes: Map<String, Value>,
+    ) {
+        let method = method.into();
+        let path = path.into();
+        let error = error.into();
+        self.sink.record(TelemetryEvent::HttpRequestFailed {
+            session_id: self.session_id.clone(),
+            attempt,
+            method: method.clone(),
+            path: path.clone(),
+            error: error.clone(),
+            retryable,
+            attributes: attributes.clone(),
+        });
+        let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes);
+        trace_attributes.insert("error".to_string(), Value::String(error));
+        trace_attributes.insert("retryable".to_string(), Value::Bool(retryable));
+        self.record("http_request_failed", trace_attributes);
+    }
+
+    pub fn record_analytics(&self, event: AnalyticsEvent) {
+        let mut attributes = event.properties.clone();
+        attributes.insert(
+            "namespace".to_string(),
+            Value::String(event.namespace.clone()),
+        );
+        attributes.insert("action".to_string(), Value::String(event.action.clone()));
+        self.sink.record(TelemetryEvent::Analytics(event));
+        self.record("analytics", attributes);
+    }
+}
+
+fn merge_trace_fields(
+    method: String,
+    path: String,
+    attempt: u32,
+    mut attributes: Map<String, Value>,
+) -> Map<String, Value> {
+    attributes.insert("method".to_string(), Value::String(method));
+    attributes.insert("path".to_string(), Value::String(path));
+    attributes.insert("attempt".to_string(), Value::from(attempt));
+    attributes
+}
+
+fn current_timestamp_ms() -> u64 {
+    SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap_or_default()
+        .as_millis()
+        .try_into()
+        .unwrap_or(u64::MAX)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn request_profile_emits_headers_and_merges_body() {
+        let profile = AnthropicRequestProfile::new(
+            ClientIdentity::new("claude-code", "1.2.3").with_runtime("rust-cli"),
+        )
+        .with_beta("tools-2026-04-01")
+        .with_extra_body("metadata", serde_json::json!({"source": "test"}));
+
+        assert_eq!(
+            profile.header_pairs(),
+            vec![
+                (
+                    "anthropic-version".to_string(),
+                    DEFAULT_ANTHROPIC_VERSION.to_string()
+                ),
+                ("user-agent".to_string(), "claude-code/1.2.3".to_string()),
+                (
+                    "anthropic-beta".to_string(),
+                    "claude-code-20250219,prompt-caching-scope-2026-01-05,tools-2026-04-01"
+                        .to_string(),
+                ),
+            ]
+        );
+
+        let body = profile
+            .render_json_body(&serde_json::json!({"model": "claude-sonnet"}))
+            .expect("body should serialize");
+        assert_eq!(
+            body["metadata"]["source"],
+            Value::String("test".to_string())
+        );
+        assert_eq!(
+            body["betas"],
+            serde_json::json!([
+                "claude-code-20250219",
+                "prompt-caching-scope-2026-01-05",
+                "tools-2026-04-01"
+            ])
+        );
+    }
+
+    #[test]
+    fn session_tracer_records_structured_events_and_trace_sequence() {
+        let sink = Arc::new(MemoryTelemetrySink::default());
+        let tracer = SessionTracer::new("session-123", sink.clone());
+
+        tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new());
+        tracer.record_analytics(
+            AnalyticsEvent::new("cli", "prompt_sent")
+                .with_property("model", Value::String("claude-opus".to_string())),
+        );
+
+        let events = sink.events();
+        assert!(matches!(
+            &events[0],
+            TelemetryEvent::HttpRequestStarted {
+                session_id,
+                attempt: 1,
+                method,
+                path,
+                ..
+            } if session_id == "session-123" && method == "POST" && path == "/v1/messages"
+        ));
+        assert!(matches!(
+            &events[1],
+            TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. })
+            if name == "http_request_started"
+        ));
+        assert!(matches!(&events[2], TelemetryEvent::Analytics(_)));
+        assert!(matches!(
+            &events[3],
+            TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. })
+            if name == "analytics"
+        ));
+    }
+
+    #[test]
+    fn jsonl_sink_persists_events() {
+        let path =
+            std::env::temp_dir().join(format!("telemetry-jsonl-{}.log", current_timestamp_ms()));
+        let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
+
+        sink.record(TelemetryEvent::Analytics(
+            AnalyticsEvent::new("cli", "turn_completed").with_property("ok", Value::Bool(true)),
+        ));
+
+        let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");
+        assert!(contents.contains("\"type\":\"analytics\""));
+        assert!(contents.contains("\"action\":\"turn_completed\""));
+
+        let _ = std::fs::remove_file(path);
+    }
+}

+ 3 - 13
rust/crates/tools/src/lib.rs

@@ -14,7 +14,7 @@ use runtime::{
     edit_file, execute_bash, glob_search, grep_search, load_system_prompt, read_file, write_file,
     ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, ConversationMessage,
     ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, PermissionPolicy,
-    RuntimeError, Session, TokenUsage, ToolError, ToolExecutor,
+    RuntimeError, Session, ToolError, ToolExecutor,
 };
 use serde::{Deserialize, Serialize};
 use serde_json::{json, Value};
@@ -1891,12 +1891,7 @@ impl ApiClient for ProviderRuntimeClient {
                         }
                     }
                     ApiStreamEvent::MessageDelta(delta) => {
-                        events.push(AssistantEvent::Usage(TokenUsage {
-                            input_tokens: delta.usage.input_tokens,
-                            output_tokens: delta.usage.output_tokens,
-                            cache_creation_input_tokens: 0,
-                            cache_read_input_tokens: 0,
-                        }));
+                        events.push(AssistantEvent::Usage(delta.usage.token_usage()));
                     }
                     ApiStreamEvent::MessageStop(_) => {
                         saw_stop = true;
@@ -2045,12 +2040,7 @@ fn response_to_events(response: MessageResponse) -> Vec<AssistantEvent> {
         }
     }
 
-    events.push(AssistantEvent::Usage(TokenUsage {
-        input_tokens: response.usage.input_tokens,
-        output_tokens: response.usage.output_tokens,
-        cache_creation_input_tokens: response.usage.cache_creation_input_tokens,
-        cache_read_input_tokens: response.usage.cache_read_input_tokens,
-    }));
+    events.push(AssistantEvent::Usage(response.usage.token_usage()));
     events.push(AssistantEvent::MessageStop);
     events
 }