| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123 |
- use std::collections::HashMap;
- use std::io;
- use std::sync::Arc;
- use std::time::{SystemTime, UNIX_EPOCH};
- use api::{InputContentBlock, MessageRequest, MessageResponse, OutputContentBlock, Usage};
- use serde_json::{json, Value};
- use tokio::io::{AsyncReadExt, AsyncWriteExt};
- use tokio::net::TcpListener;
- use tokio::sync::{oneshot, Mutex};
- use tokio::task::JoinHandle;
- pub const SCENARIO_PREFIX: &str = "PARITY_SCENARIO:";
- pub const DEFAULT_MODEL: &str = "claude-sonnet-4-6";
- #[derive(Debug, Clone, PartialEq, Eq)]
- pub struct CapturedRequest {
- pub method: String,
- pub path: String,
- pub headers: HashMap<String, String>,
- pub scenario: String,
- pub stream: bool,
- pub raw_body: String,
- }
- pub struct MockAnthropicService {
- base_url: String,
- requests: Arc<Mutex<Vec<CapturedRequest>>>,
- shutdown: Option<oneshot::Sender<()>>,
- join_handle: JoinHandle<()>,
- }
- impl MockAnthropicService {
- pub async fn spawn() -> io::Result<Self> {
- Self::spawn_on("127.0.0.1:0").await
- }
- pub async fn spawn_on(bind_addr: &str) -> io::Result<Self> {
- let listener = TcpListener::bind(bind_addr).await?;
- let address = listener.local_addr()?;
- let requests = Arc::new(Mutex::new(Vec::new()));
- let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
- let request_state = Arc::clone(&requests);
- let join_handle = tokio::spawn(async move {
- loop {
- tokio::select! {
- _ = &mut shutdown_rx => break,
- accepted = listener.accept() => {
- let Ok((socket, _)) = accepted else {
- break;
- };
- let request_state = Arc::clone(&request_state);
- tokio::spawn(async move {
- let _ = handle_connection(socket, request_state).await;
- });
- }
- }
- }
- });
- Ok(Self {
- base_url: format!("http://{address}"),
- requests,
- shutdown: Some(shutdown_tx),
- join_handle,
- })
- }
- #[must_use]
- pub fn base_url(&self) -> String {
- self.base_url.clone()
- }
- pub async fn captured_requests(&self) -> Vec<CapturedRequest> {
- self.requests.lock().await.clone()
- }
- }
- impl Drop for MockAnthropicService {
- fn drop(&mut self) {
- if let Some(shutdown) = self.shutdown.take() {
- let _ = shutdown.send(());
- }
- self.join_handle.abort();
- }
- }
- #[derive(Debug, Clone, Copy, PartialEq, Eq)]
- enum Scenario {
- StreamingText,
- ReadFileRoundtrip,
- GrepChunkAssembly,
- WriteFileAllowed,
- WriteFileDenied,
- MultiToolTurnRoundtrip,
- BashStdoutRoundtrip,
- BashPermissionPromptApproved,
- BashPermissionPromptDenied,
- PluginToolRoundtrip,
- AutoCompactTriggered,
- TokenCostReporting,
- }
- impl Scenario {
- fn parse(value: &str) -> Option<Self> {
- match value.trim() {
- "streaming_text" => Some(Self::StreamingText),
- "read_file_roundtrip" => Some(Self::ReadFileRoundtrip),
- "grep_chunk_assembly" => Some(Self::GrepChunkAssembly),
- "write_file_allowed" => Some(Self::WriteFileAllowed),
- "write_file_denied" => Some(Self::WriteFileDenied),
- "multi_tool_turn_roundtrip" => Some(Self::MultiToolTurnRoundtrip),
- "bash_stdout_roundtrip" => Some(Self::BashStdoutRoundtrip),
- "bash_permission_prompt_approved" => Some(Self::BashPermissionPromptApproved),
- "bash_permission_prompt_denied" => Some(Self::BashPermissionPromptDenied),
- "plugin_tool_roundtrip" => Some(Self::PluginToolRoundtrip),
- "auto_compact_triggered" => Some(Self::AutoCompactTriggered),
- "token_cost_reporting" => Some(Self::TokenCostReporting),
- _ => None,
- }
- }
- fn name(self) -> &'static str {
- match self {
- Self::StreamingText => "streaming_text",
- Self::ReadFileRoundtrip => "read_file_roundtrip",
- Self::GrepChunkAssembly => "grep_chunk_assembly",
- Self::WriteFileAllowed => "write_file_allowed",
- Self::WriteFileDenied => "write_file_denied",
- Self::MultiToolTurnRoundtrip => "multi_tool_turn_roundtrip",
- Self::BashStdoutRoundtrip => "bash_stdout_roundtrip",
- Self::BashPermissionPromptApproved => "bash_permission_prompt_approved",
- Self::BashPermissionPromptDenied => "bash_permission_prompt_denied",
- Self::PluginToolRoundtrip => "plugin_tool_roundtrip",
- Self::AutoCompactTriggered => "auto_compact_triggered",
- Self::TokenCostReporting => "token_cost_reporting",
- }
- }
- }
- async fn handle_connection(
- mut socket: tokio::net::TcpStream,
- requests: Arc<Mutex<Vec<CapturedRequest>>>,
- ) -> io::Result<()> {
- let (method, path, headers, raw_body) = read_http_request(&mut socket).await?;
- let request: MessageRequest = serde_json::from_str(&raw_body)
- .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error.to_string()))?;
- let scenario = detect_scenario(&request)
- .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing parity scenario"))?;
- requests.lock().await.push(CapturedRequest {
- method,
- path,
- headers,
- scenario: scenario.name().to_string(),
- stream: request.stream,
- raw_body,
- });
- let response = build_http_response(&request, scenario);
- socket.write_all(response.as_bytes()).await?;
- Ok(())
- }
- async fn read_http_request(
- socket: &mut tokio::net::TcpStream,
- ) -> io::Result<(String, String, HashMap<String, String>, String)> {
- let mut buffer = Vec::new();
- let mut header_end = None;
- loop {
- let mut chunk = [0_u8; 1024];
- let read = socket.read(&mut chunk).await?;
- if read == 0 {
- break;
- }
- buffer.extend_from_slice(&chunk[..read]);
- if let Some(position) = find_header_end(&buffer) {
- header_end = Some(position);
- break;
- }
- }
- let header_end = header_end
- .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "missing http headers"))?;
- let (header_bytes, remaining) = buffer.split_at(header_end);
- let header_text = String::from_utf8(header_bytes.to_vec())
- .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error.to_string()))?;
- let mut lines = header_text.split("\r\n");
- let request_line = lines
- .next()
- .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing request line"))?;
- let mut request_parts = request_line.split_whitespace();
- let method = request_parts
- .next()
- .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing method"))?
- .to_string();
- let path = request_parts
- .next()
- .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing path"))?
- .to_string();
- let mut headers = HashMap::new();
- let mut content_length = 0_usize;
- for line in lines {
- if line.is_empty() {
- continue;
- }
- let (name, value) = line.split_once(':').ok_or_else(|| {
- io::Error::new(io::ErrorKind::InvalidData, "malformed http header line")
- })?;
- let value = value.trim().to_string();
- if name.eq_ignore_ascii_case("content-length") {
- content_length = value.parse().map_err(|error| {
- io::Error::new(
- io::ErrorKind::InvalidData,
- format!("invalid content-length: {error}"),
- )
- })?;
- }
- headers.insert(name.to_ascii_lowercase(), value);
- }
- let mut body = remaining[4..].to_vec();
- while body.len() < content_length {
- let mut chunk = vec![0_u8; content_length - body.len()];
- let read = socket.read(&mut chunk).await?;
- if read == 0 {
- break;
- }
- body.extend_from_slice(&chunk[..read]);
- }
- let body = String::from_utf8(body)
- .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error.to_string()))?;
- Ok((method, path, headers, body))
- }
- fn find_header_end(bytes: &[u8]) -> Option<usize> {
- bytes.windows(4).position(|window| window == b"\r\n\r\n")
- }
- fn detect_scenario(request: &MessageRequest) -> Option<Scenario> {
- request.messages.iter().rev().find_map(|message| {
- message.content.iter().rev().find_map(|block| match block {
- InputContentBlock::Text { text } => text
- .split_whitespace()
- .find_map(|token| token.strip_prefix(SCENARIO_PREFIX))
- .and_then(Scenario::parse),
- _ => None,
- })
- })
- }
- fn latest_tool_result(request: &MessageRequest) -> Option<(String, bool)> {
- request.messages.iter().rev().find_map(|message| {
- message.content.iter().rev().find_map(|block| match block {
- InputContentBlock::ToolResult {
- content, is_error, ..
- } => Some((flatten_tool_result_content(content), *is_error)),
- _ => None,
- })
- })
- }
- fn tool_results_by_name(request: &MessageRequest) -> HashMap<String, (String, bool)> {
- let mut tool_names_by_id = HashMap::new();
- for message in &request.messages {
- for block in &message.content {
- if let InputContentBlock::ToolUse { id, name, .. } = block {
- tool_names_by_id.insert(id.clone(), name.clone());
- }
- }
- }
- let mut results = HashMap::new();
- for message in request.messages.iter().rev() {
- for block in message.content.iter().rev() {
- if let InputContentBlock::ToolResult {
- tool_use_id,
- content,
- is_error,
- } = block
- {
- let tool_name = tool_names_by_id
- .get(tool_use_id)
- .cloned()
- .unwrap_or_else(|| tool_use_id.clone());
- results
- .entry(tool_name)
- .or_insert_with(|| (flatten_tool_result_content(content), *is_error));
- }
- }
- }
- results
- }
- fn flatten_tool_result_content(content: &[api::ToolResultContentBlock]) -> String {
- content
- .iter()
- .map(|block| match block {
- api::ToolResultContentBlock::Text { text } => text.clone(),
- api::ToolResultContentBlock::Json { value } => value.to_string(),
- })
- .collect::<Vec<_>>()
- .join("\n")
- }
- #[allow(clippy::too_many_lines)]
- fn build_http_response(request: &MessageRequest, scenario: Scenario) -> String {
- let response = if request.stream {
- let body = build_stream_body(request, scenario);
- return http_response(
- "200 OK",
- "text/event-stream",
- &body,
- &[("x-request-id", request_id_for(scenario))],
- );
- } else {
- build_message_response(request, scenario)
- };
- http_response(
- "200 OK",
- "application/json",
- &serde_json::to_string(&response).expect("message response should serialize"),
- &[("request-id", request_id_for(scenario))],
- )
- }
- #[allow(clippy::too_many_lines)]
- fn build_stream_body(request: &MessageRequest, scenario: Scenario) -> String {
- match scenario {
- Scenario::StreamingText => streaming_text_sse(),
- Scenario::ReadFileRoundtrip => match latest_tool_result(request) {
- Some((tool_output, _)) => final_text_sse(&format!(
- "read_file roundtrip complete: {}",
- extract_read_content(&tool_output)
- )),
- None => tool_use_sse(
- "toolu_read_fixture",
- "read_file",
- &[r#"{"path":"fixture.txt"}"#],
- ),
- },
- Scenario::GrepChunkAssembly => match latest_tool_result(request) {
- Some((tool_output, _)) => final_text_sse(&format!(
- "grep_search matched {} occurrences",
- extract_num_matches(&tool_output)
- )),
- None => tool_use_sse(
- "toolu_grep_fixture",
- "grep_search",
- &[
- "{\"pattern\":\"par",
- "ity\",\"path\":\"fixture.txt\"",
- ",\"output_mode\":\"count\"}",
- ],
- ),
- },
- Scenario::WriteFileAllowed => match latest_tool_result(request) {
- Some((tool_output, _)) => final_text_sse(&format!(
- "write_file succeeded: {}",
- extract_file_path(&tool_output)
- )),
- None => tool_use_sse(
- "toolu_write_allowed",
- "write_file",
- &[r#"{"path":"generated/output.txt","content":"created by mock service\n"}"#],
- ),
- },
- Scenario::WriteFileDenied => match latest_tool_result(request) {
- Some((tool_output, _)) => {
- final_text_sse(&format!("write_file denied as expected: {tool_output}"))
- }
- None => tool_use_sse(
- "toolu_write_denied",
- "write_file",
- &[r#"{"path":"generated/denied.txt","content":"should not exist\n"}"#],
- ),
- },
- Scenario::MultiToolTurnRoundtrip => {
- let tool_results = tool_results_by_name(request);
- match (
- tool_results.get("read_file"),
- tool_results.get("grep_search"),
- ) {
- (Some((read_output, _)), Some((grep_output, _))) => final_text_sse(&format!(
- "multi-tool roundtrip complete: {} / {} occurrences",
- extract_read_content(read_output),
- extract_num_matches(grep_output)
- )),
- _ => tool_uses_sse(&[
- ToolUseSse {
- tool_id: "toolu_multi_read",
- tool_name: "read_file",
- partial_json_chunks: &[r#"{"path":"fixture.txt"}"#],
- },
- ToolUseSse {
- tool_id: "toolu_multi_grep",
- tool_name: "grep_search",
- partial_json_chunks: &[
- "{\"pattern\":\"par",
- "ity\",\"path\":\"fixture.txt\"",
- ",\"output_mode\":\"count\"}",
- ],
- },
- ]),
- }
- }
- Scenario::BashStdoutRoundtrip => match latest_tool_result(request) {
- Some((tool_output, _)) => final_text_sse(&format!(
- "bash completed: {}",
- extract_bash_stdout(&tool_output)
- )),
- None => tool_use_sse(
- "toolu_bash_stdout",
- "bash",
- &[r#"{"command":"printf 'alpha from bash'","timeout":1000}"#],
- ),
- },
- Scenario::BashPermissionPromptApproved => match latest_tool_result(request) {
- Some((tool_output, is_error)) => {
- if is_error {
- final_text_sse(&format!("bash approval unexpectedly failed: {tool_output}"))
- } else {
- final_text_sse(&format!(
- "bash approved and executed: {}",
- extract_bash_stdout(&tool_output)
- ))
- }
- }
- None => tool_use_sse(
- "toolu_bash_prompt_allow",
- "bash",
- &[r#"{"command":"printf 'approved via prompt'","timeout":1000}"#],
- ),
- },
- Scenario::BashPermissionPromptDenied => match latest_tool_result(request) {
- Some((tool_output, _)) => {
- final_text_sse(&format!("bash denied as expected: {tool_output}"))
- }
- None => tool_use_sse(
- "toolu_bash_prompt_deny",
- "bash",
- &[r#"{"command":"printf 'should not run'","timeout":1000}"#],
- ),
- },
- Scenario::PluginToolRoundtrip => match latest_tool_result(request) {
- Some((tool_output, _)) => final_text_sse(&format!(
- "plugin tool completed: {}",
- extract_plugin_message(&tool_output)
- )),
- None => tool_use_sse(
- "toolu_plugin_echo",
- "plugin_echo",
- &[r#"{"message":"hello from plugin parity"}"#],
- ),
- },
- Scenario::AutoCompactTriggered => {
- final_text_sse_with_usage("auto compact parity complete.", 50_000, 200)
- }
- Scenario::TokenCostReporting => {
- final_text_sse_with_usage("token cost reporting parity complete.", 1_000, 500)
- }
- }
- }
- #[allow(clippy::too_many_lines)]
- fn build_message_response(request: &MessageRequest, scenario: Scenario) -> MessageResponse {
- match scenario {
- Scenario::StreamingText => text_message_response(
- "msg_streaming_text",
- "Mock streaming says hello from the parity harness.",
- ),
- Scenario::ReadFileRoundtrip => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_read_file_final",
- &format!(
- "read_file roundtrip complete: {}",
- extract_read_content(&tool_output)
- ),
- ),
- None => tool_message_response(
- "msg_read_file_tool",
- "toolu_read_fixture",
- "read_file",
- json!({"path": "fixture.txt"}),
- ),
- },
- Scenario::GrepChunkAssembly => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_grep_final",
- &format!(
- "grep_search matched {} occurrences",
- extract_num_matches(&tool_output)
- ),
- ),
- None => tool_message_response(
- "msg_grep_tool",
- "toolu_grep_fixture",
- "grep_search",
- json!({"pattern": "parity", "path": "fixture.txt", "output_mode": "count"}),
- ),
- },
- Scenario::WriteFileAllowed => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_write_allowed_final",
- &format!("write_file succeeded: {}", extract_file_path(&tool_output)),
- ),
- None => tool_message_response(
- "msg_write_allowed_tool",
- "toolu_write_allowed",
- "write_file",
- json!({"path": "generated/output.txt", "content": "created by mock service\n"}),
- ),
- },
- Scenario::WriteFileDenied => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_write_denied_final",
- &format!("write_file denied as expected: {tool_output}"),
- ),
- None => tool_message_response(
- "msg_write_denied_tool",
- "toolu_write_denied",
- "write_file",
- json!({"path": "generated/denied.txt", "content": "should not exist\n"}),
- ),
- },
- Scenario::MultiToolTurnRoundtrip => {
- let tool_results = tool_results_by_name(request);
- match (
- tool_results.get("read_file"),
- tool_results.get("grep_search"),
- ) {
- (Some((read_output, _)), Some((grep_output, _))) => text_message_response(
- "msg_multi_tool_final",
- &format!(
- "multi-tool roundtrip complete: {} / {} occurrences",
- extract_read_content(read_output),
- extract_num_matches(grep_output)
- ),
- ),
- _ => tool_message_response_many(
- "msg_multi_tool_start",
- &[
- ToolUseMessage {
- tool_id: "toolu_multi_read",
- tool_name: "read_file",
- input: json!({"path": "fixture.txt"}),
- },
- ToolUseMessage {
- tool_id: "toolu_multi_grep",
- tool_name: "grep_search",
- input: json!({"pattern": "parity", "path": "fixture.txt", "output_mode": "count"}),
- },
- ],
- ),
- }
- }
- Scenario::BashStdoutRoundtrip => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_bash_stdout_final",
- &format!("bash completed: {}", extract_bash_stdout(&tool_output)),
- ),
- None => tool_message_response(
- "msg_bash_stdout_tool",
- "toolu_bash_stdout",
- "bash",
- json!({"command": "printf 'alpha from bash'", "timeout": 1000}),
- ),
- },
- Scenario::BashPermissionPromptApproved => match latest_tool_result(request) {
- Some((tool_output, is_error)) => {
- if is_error {
- text_message_response(
- "msg_bash_prompt_allow_error",
- &format!("bash approval unexpectedly failed: {tool_output}"),
- )
- } else {
- text_message_response(
- "msg_bash_prompt_allow_final",
- &format!(
- "bash approved and executed: {}",
- extract_bash_stdout(&tool_output)
- ),
- )
- }
- }
- None => tool_message_response(
- "msg_bash_prompt_allow_tool",
- "toolu_bash_prompt_allow",
- "bash",
- json!({"command": "printf 'approved via prompt'", "timeout": 1000}),
- ),
- },
- Scenario::BashPermissionPromptDenied => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_bash_prompt_deny_final",
- &format!("bash denied as expected: {tool_output}"),
- ),
- None => tool_message_response(
- "msg_bash_prompt_deny_tool",
- "toolu_bash_prompt_deny",
- "bash",
- json!({"command": "printf 'should not run'", "timeout": 1000}),
- ),
- },
- Scenario::PluginToolRoundtrip => match latest_tool_result(request) {
- Some((tool_output, _)) => text_message_response(
- "msg_plugin_tool_final",
- &format!(
- "plugin tool completed: {}",
- extract_plugin_message(&tool_output)
- ),
- ),
- None => tool_message_response(
- "msg_plugin_tool_start",
- "toolu_plugin_echo",
- "plugin_echo",
- json!({"message": "hello from plugin parity"}),
- ),
- },
- Scenario::AutoCompactTriggered => text_message_response_with_usage(
- "msg_auto_compact_triggered",
- "auto compact parity complete.",
- 50_000,
- 200,
- ),
- Scenario::TokenCostReporting => text_message_response_with_usage(
- "msg_token_cost_reporting",
- "token cost reporting parity complete.",
- 1_000,
- 500,
- ),
- }
- }
- fn request_id_for(scenario: Scenario) -> &'static str {
- match scenario {
- Scenario::StreamingText => "req_streaming_text",
- Scenario::ReadFileRoundtrip => "req_read_file_roundtrip",
- Scenario::GrepChunkAssembly => "req_grep_chunk_assembly",
- Scenario::WriteFileAllowed => "req_write_file_allowed",
- Scenario::WriteFileDenied => "req_write_file_denied",
- Scenario::MultiToolTurnRoundtrip => "req_multi_tool_turn_roundtrip",
- Scenario::BashStdoutRoundtrip => "req_bash_stdout_roundtrip",
- Scenario::BashPermissionPromptApproved => "req_bash_permission_prompt_approved",
- Scenario::BashPermissionPromptDenied => "req_bash_permission_prompt_denied",
- Scenario::PluginToolRoundtrip => "req_plugin_tool_roundtrip",
- Scenario::AutoCompactTriggered => "req_auto_compact_triggered",
- Scenario::TokenCostReporting => "req_token_cost_reporting",
- }
- }
- fn http_response(status: &str, content_type: &str, body: &str, headers: &[(&str, &str)]) -> String {
- let mut extra_headers = String::new();
- for (name, value) in headers {
- use std::fmt::Write as _;
- write!(&mut extra_headers, "{name}: {value}\r\n").expect("header write should succeed");
- }
- format!(
- "HTTP/1.1 {status}\r\ncontent-type: {content_type}\r\n{extra_headers}content-length: {}\r\nconnection: close\r\n\r\n{body}",
- body.len()
- )
- }
- fn text_message_response(id: &str, text: &str) -> MessageResponse {
- MessageResponse {
- id: id.to_string(),
- kind: "message".to_string(),
- role: "assistant".to_string(),
- content: vec![OutputContentBlock::Text {
- text: text.to_string(),
- }],
- model: DEFAULT_MODEL.to_string(),
- stop_reason: Some("end_turn".to_string()),
- stop_sequence: None,
- usage: Usage {
- input_tokens: 10,
- cache_creation_input_tokens: 0,
- cache_read_input_tokens: 0,
- output_tokens: 6,
- },
- request_id: None,
- }
- }
- fn text_message_response_with_usage(
- id: &str,
- text: &str,
- input_tokens: u32,
- output_tokens: u32,
- ) -> MessageResponse {
- MessageResponse {
- id: id.to_string(),
- kind: "message".to_string(),
- role: "assistant".to_string(),
- content: vec![OutputContentBlock::Text {
- text: text.to_string(),
- }],
- model: DEFAULT_MODEL.to_string(),
- stop_reason: Some("end_turn".to_string()),
- stop_sequence: None,
- usage: Usage {
- input_tokens,
- cache_creation_input_tokens: 0,
- cache_read_input_tokens: 0,
- output_tokens,
- },
- request_id: None,
- }
- }
- fn tool_message_response(
- id: &str,
- tool_id: &str,
- tool_name: &str,
- input: Value,
- ) -> MessageResponse {
- tool_message_response_many(
- id,
- &[ToolUseMessage {
- tool_id,
- tool_name,
- input,
- }],
- )
- }
- struct ToolUseMessage<'a> {
- tool_id: &'a str,
- tool_name: &'a str,
- input: Value,
- }
- fn tool_message_response_many(id: &str, tool_uses: &[ToolUseMessage<'_>]) -> MessageResponse {
- MessageResponse {
- id: id.to_string(),
- kind: "message".to_string(),
- role: "assistant".to_string(),
- content: tool_uses
- .iter()
- .map(|tool_use| OutputContentBlock::ToolUse {
- id: tool_use.tool_id.to_string(),
- name: tool_use.tool_name.to_string(),
- input: tool_use.input.clone(),
- })
- .collect(),
- model: DEFAULT_MODEL.to_string(),
- stop_reason: Some("tool_use".to_string()),
- stop_sequence: None,
- usage: Usage {
- input_tokens: 10,
- cache_creation_input_tokens: 0,
- cache_read_input_tokens: 0,
- output_tokens: 3,
- },
- request_id: None,
- }
- }
- fn streaming_text_sse() -> String {
- let mut body = String::new();
- append_sse(
- &mut body,
- "message_start",
- json!({
- "type": "message_start",
- "message": {
- "id": "msg_streaming_text",
- "type": "message",
- "role": "assistant",
- "content": [],
- "model": DEFAULT_MODEL,
- "stop_reason": null,
- "stop_sequence": null,
- "usage": usage_json(11, 0)
- }
- }),
- );
- append_sse(
- &mut body,
- "content_block_start",
- json!({
- "type": "content_block_start",
- "index": 0,
- "content_block": {"type": "text", "text": ""}
- }),
- );
- append_sse(
- &mut body,
- "content_block_delta",
- json!({
- "type": "content_block_delta",
- "index": 0,
- "delta": {"type": "text_delta", "text": "Mock streaming "}
- }),
- );
- append_sse(
- &mut body,
- "content_block_delta",
- json!({
- "type": "content_block_delta",
- "index": 0,
- "delta": {"type": "text_delta", "text": "says hello from the parity harness."}
- }),
- );
- append_sse(
- &mut body,
- "content_block_stop",
- json!({
- "type": "content_block_stop",
- "index": 0
- }),
- );
- append_sse(
- &mut body,
- "message_delta",
- json!({
- "type": "message_delta",
- "delta": {"stop_reason": "end_turn", "stop_sequence": null},
- "usage": usage_json(11, 8)
- }),
- );
- append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
- body
- }
- fn tool_use_sse(tool_id: &str, tool_name: &str, partial_json_chunks: &[&str]) -> String {
- tool_uses_sse(&[ToolUseSse {
- tool_id,
- tool_name,
- partial_json_chunks,
- }])
- }
- struct ToolUseSse<'a> {
- tool_id: &'a str,
- tool_name: &'a str,
- partial_json_chunks: &'a [&'a str],
- }
- fn tool_uses_sse(tool_uses: &[ToolUseSse<'_>]) -> String {
- let mut body = String::new();
- let message_id = tool_uses.first().map_or_else(
- || "msg_tool_use".to_string(),
- |tool_use| format!("msg_{}", tool_use.tool_id),
- );
- append_sse(
- &mut body,
- "message_start",
- json!({
- "type": "message_start",
- "message": {
- "id": message_id,
- "type": "message",
- "role": "assistant",
- "content": [],
- "model": DEFAULT_MODEL,
- "stop_reason": null,
- "stop_sequence": null,
- "usage": usage_json(12, 0)
- }
- }),
- );
- for (index, tool_use) in tool_uses.iter().enumerate() {
- append_sse(
- &mut body,
- "content_block_start",
- json!({
- "type": "content_block_start",
- "index": index,
- "content_block": {
- "type": "tool_use",
- "id": tool_use.tool_id,
- "name": tool_use.tool_name,
- "input": {}
- }
- }),
- );
- for chunk in tool_use.partial_json_chunks {
- append_sse(
- &mut body,
- "content_block_delta",
- json!({
- "type": "content_block_delta",
- "index": index,
- "delta": {"type": "input_json_delta", "partial_json": chunk}
- }),
- );
- }
- append_sse(
- &mut body,
- "content_block_stop",
- json!({
- "type": "content_block_stop",
- "index": index
- }),
- );
- }
- append_sse(
- &mut body,
- "message_delta",
- json!({
- "type": "message_delta",
- "delta": {"stop_reason": "tool_use", "stop_sequence": null},
- "usage": usage_json(12, 4)
- }),
- );
- append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
- body
- }
- fn final_text_sse(text: &str) -> String {
- let mut body = String::new();
- append_sse(
- &mut body,
- "message_start",
- json!({
- "type": "message_start",
- "message": {
- "id": unique_message_id(),
- "type": "message",
- "role": "assistant",
- "content": [],
- "model": DEFAULT_MODEL,
- "stop_reason": null,
- "stop_sequence": null,
- "usage": usage_json(14, 0)
- }
- }),
- );
- append_sse(
- &mut body,
- "content_block_start",
- json!({
- "type": "content_block_start",
- "index": 0,
- "content_block": {"type": "text", "text": ""}
- }),
- );
- append_sse(
- &mut body,
- "content_block_delta",
- json!({
- "type": "content_block_delta",
- "index": 0,
- "delta": {"type": "text_delta", "text": text}
- }),
- );
- append_sse(
- &mut body,
- "content_block_stop",
- json!({
- "type": "content_block_stop",
- "index": 0
- }),
- );
- append_sse(
- &mut body,
- "message_delta",
- json!({
- "type": "message_delta",
- "delta": {"stop_reason": "end_turn", "stop_sequence": null},
- "usage": usage_json(14, 7)
- }),
- );
- append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
- body
- }
- fn final_text_sse_with_usage(text: &str, input_tokens: u32, output_tokens: u32) -> String {
- let mut body = String::new();
- append_sse(
- &mut body,
- "message_start",
- json!({
- "type": "message_start",
- "message": {
- "id": unique_message_id(),
- "type": "message",
- "role": "assistant",
- "content": [],
- "model": DEFAULT_MODEL,
- "stop_reason": null,
- "stop_sequence": null,
- "usage": {
- "input_tokens": input_tokens,
- "cache_creation_input_tokens": 0,
- "cache_read_input_tokens": 0,
- "output_tokens": 0
- }
- }
- }),
- );
- append_sse(
- &mut body,
- "content_block_start",
- json!({
- "type": "content_block_start",
- "index": 0,
- "content_block": {"type": "text", "text": ""}
- }),
- );
- append_sse(
- &mut body,
- "content_block_delta",
- json!({
- "type": "content_block_delta",
- "index": 0,
- "delta": {"type": "text_delta", "text": text}
- }),
- );
- append_sse(
- &mut body,
- "content_block_stop",
- json!({
- "type": "content_block_stop",
- "index": 0
- }),
- );
- append_sse(
- &mut body,
- "message_delta",
- json!({
- "type": "message_delta",
- "delta": {"stop_reason": "end_turn", "stop_sequence": null},
- "usage": {
- "input_tokens": input_tokens,
- "cache_creation_input_tokens": 0,
- "cache_read_input_tokens": 0,
- "output_tokens": output_tokens
- }
- }),
- );
- append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
- body
- }
- #[allow(clippy::needless_pass_by_value)]
- fn append_sse(buffer: &mut String, event: &str, payload: Value) {
- use std::fmt::Write as _;
- writeln!(buffer, "event: {event}").expect("event write should succeed");
- writeln!(buffer, "data: {payload}").expect("payload write should succeed");
- buffer.push('\n');
- }
- fn usage_json(input_tokens: u32, output_tokens: u32) -> Value {
- json!({
- "input_tokens": input_tokens,
- "cache_creation_input_tokens": 0,
- "cache_read_input_tokens": 0,
- "output_tokens": output_tokens
- })
- }
- fn unique_message_id() -> String {
- let nanos = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("clock should be after epoch")
- .as_nanos();
- format!("msg_{nanos}")
- }
- fn extract_read_content(tool_output: &str) -> String {
- serde_json::from_str::<Value>(tool_output)
- .ok()
- .and_then(|value| {
- value
- .get("file")
- .and_then(|file| file.get("content"))
- .and_then(Value::as_str)
- .map(ToOwned::to_owned)
- })
- .unwrap_or_else(|| tool_output.trim().to_string())
- }
- #[allow(clippy::cast_possible_truncation)]
- fn extract_num_matches(tool_output: &str) -> usize {
- serde_json::from_str::<Value>(tool_output)
- .ok()
- .and_then(|value| value.get("numMatches").and_then(Value::as_u64))
- .unwrap_or(0) as usize
- }
- fn extract_file_path(tool_output: &str) -> String {
- serde_json::from_str::<Value>(tool_output)
- .ok()
- .and_then(|value| {
- value
- .get("filePath")
- .and_then(Value::as_str)
- .map(ToOwned::to_owned)
- })
- .unwrap_or_else(|| tool_output.trim().to_string())
- }
- fn extract_bash_stdout(tool_output: &str) -> String {
- serde_json::from_str::<Value>(tool_output)
- .ok()
- .and_then(|value| {
- value
- .get("stdout")
- .and_then(Value::as_str)
- .map(ToOwned::to_owned)
- })
- .unwrap_or_else(|| tool_output.trim().to_string())
- }
- fn extract_plugin_message(tool_output: &str) -> String {
- serde_json::from_str::<Value>(tool_output)
- .ok()
- .and_then(|value| {
- value
- .get("input")
- .and_then(|input| input.get("message"))
- .and_then(Value::as_str)
- .map(ToOwned::to_owned)
- })
- .unwrap_or_else(|| tool_output.trim().to_string())
- }
|