Browse Source

wip: telemetry progress

Yeachan-Heo 2 tháng trước cách đây
mục cha
commit
e7e3ae2875

+ 1 - 0
rust/Cargo.lock

@@ -1097,6 +1097,7 @@ dependencies = [
  "serde",
  "serde_json",
  "sha2",
+ "telemetry",
  "tokio",
  "walkdir",
 ]

+ 53 - 6
rust/crates/runtime/src/conversation.rs

@@ -161,6 +161,7 @@ where
         self
     }
 
+    #[allow(clippy::too_many_lines)]
     pub fn run_turn(
         &mut self,
         user_input: impl Into<String>,
@@ -197,7 +198,13 @@ where
                     return Err(error);
                 }
             };
-            let (assistant_message, usage) = build_assistant_message(events)?;
+            let (assistant_message, usage) = match build_assistant_message(events) {
+                Ok(result) => result,
+                Err(error) => {
+                    self.record_turn_failed(iterations, &error);
+                    return Err(error);
+                }
+            };
             if let Some(usage) = usage {
                 self.usage_tracker.record(usage);
             }
@@ -211,7 +218,11 @@ where
                     _ => None,
                 })
                 .collect::<Vec<_>>();
-            self.record_assistant_iteration(iterations, &assistant_message, pending_tool_uses.len());
+            self.record_assistant_iteration(
+                iterations,
+                &assistant_message,
+                pending_tool_uses.len(),
+            );
 
             self.session.messages.push(assistant_message.clone());
             assistant_messages.push(assistant_message);
@@ -359,7 +370,10 @@ where
                 "iteration".to_string(),
                 Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)),
             );
-            attributes.insert("tool_name".to_string(), Value::String(tool_name.to_string()));
+            attributes.insert(
+                "tool_name".to_string(),
+                Value::String(tool_name.to_string()),
+            );
             tracer.record("tool_execution_started", attributes);
         }
     }
@@ -396,9 +410,7 @@ where
             let mut attributes = Map::new();
             attributes.insert(
                 "assistant_message_count".to_string(),
-                Value::from(
-                    u64::try_from(summary.assistant_messages.len()).unwrap_or(u64::MAX),
-                ),
+                Value::from(u64::try_from(summary.assistant_messages.len()).unwrap_or(u64::MAX)),
             );
             attributes.insert(
                 "tool_result_count".to_string(),
@@ -554,6 +566,8 @@ mod tests {
     use crate::session::{ContentBlock, MessageRole, Session};
     use crate::usage::TokenUsage;
     use std::path::PathBuf;
+    use std::sync::Arc;
+    use telemetry::{MemoryTelemetrySink, SessionTracer, TelemetryEvent};
 
     struct ScriptedApiClient {
         call_count: usize,
@@ -666,6 +680,39 @@ mod tests {
         ));
     }
 
+    #[test]
+    fn records_runtime_session_trace_events() {
+        let sink = Arc::new(MemoryTelemetrySink::default());
+        let tracer = SessionTracer::new("session-runtime", sink.clone());
+        let mut runtime = ConversationRuntime::new(
+            Session::new(),
+            ScriptedApiClient { call_count: 0 },
+            StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())),
+            PermissionPolicy::new(PermissionMode::WorkspaceWrite),
+            vec!["system".to_string()],
+        )
+        .with_session_tracer(tracer);
+
+        runtime
+            .run_turn("what is 2 + 2?", Some(&mut PromptAllowOnce))
+            .expect("conversation loop should succeed");
+
+        let events = sink.events();
+        let trace_names = events
+            .iter()
+            .filter_map(|event| match event {
+                TelemetryEvent::SessionTrace(trace) => Some(trace.name.as_str()),
+                _ => None,
+            })
+            .collect::<Vec<_>>();
+
+        assert!(trace_names.contains(&"turn_started"));
+        assert!(trace_names.contains(&"assistant_iteration_completed"));
+        assert!(trace_names.contains(&"tool_execution_started"));
+        assert!(trace_names.contains(&"tool_execution_finished"));
+        assert!(trace_names.contains(&"turn_completed"));
+    }
+
     #[test]
     fn records_denied_tool_results_when_prompt_rejects() {
         struct RejectPrompter;

+ 10 - 9
rust/crates/rusty-claude-cli/src/main.rs

@@ -1237,6 +1237,7 @@ impl LiveCli {
         let message_count = session.messages.len();
         self.runtime = build_runtime(
             session,
+            &self.session.id,
             model.clone(),
             self.system_prompt.clone(),
             true,
@@ -1342,7 +1343,7 @@ impl LiveCli {
         let message_count = session.messages.len();
         self.runtime = build_runtime(
             session,
-            &self.session.id,
+            &handle.id,
             self.model.clone(),
             self.system_prompt.clone(),
             true,
@@ -1922,6 +1923,7 @@ fn build_runtime_feature_config(
         .clone())
 }
 
+#[allow(clippy::too_many_arguments)]
 fn build_runtime(
     session: Session,
     session_id: &str,
@@ -1935,14 +1937,13 @@ fn build_runtime(
 {
     let session_tracer = build_session_tracer(session_id)?;
     let api_client = match session_tracer.clone() {
-        Some(session_tracer) => AnthropicRuntimeClient::new(
-            model,
-            enable_tools,
-            emit_output,
-            allowed_tools.clone(),
-        )?
-        .with_session_tracer(session_tracer),
-        None => AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?,
+        Some(session_tracer) => {
+            AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?
+                .with_session_tracer(session_tracer)
+        }
+        None => {
+            AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?
+        }
     };
     let runtime = ConversationRuntime::new_with_features(
         session,

+ 3 - 6
rust/crates/telemetry/src/lib.rs

@@ -489,15 +489,12 @@ mod tests {
 
     #[test]
     fn jsonl_sink_persists_events() {
-        let path = std::env::temp_dir().join(format!(
-            "telemetry-jsonl-{}.log",
-            current_timestamp_ms()
-        ));
+        let path =
+            std::env::temp_dir().join(format!("telemetry-jsonl-{}.log", current_timestamp_ms()));
         let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
 
         sink.record(TelemetryEvent::Analytics(
-            AnalyticsEvent::new("cli", "turn_completed")
-                .with_property("ok", Value::Bool(true)),
+            AnalyticsEvent::new("cli", "turn_completed").with_property("ok", Value::Bool(true)),
         ));
 
         let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");