Selaa lähdekoodia

Lock down CLI-to-mock behavioral parity for Anthropic flows

This adds a deterministic mock Anthropic-compatible /v1/messages service,
a clean-environment CLI harness, and repo docs so the first parity
milestone can be validated without live network dependencies.

Constraint: First milestone must prove Rust claw can connect from a clean environment and cover streaming, tool assembly, and permission/tool flow
Constraint: No new third-party dependencies; reuse the existing Rust workspace stack
Rejected: Record/replay live Anthropic traffic | nondeterministic and unsuitable for repeatable CI coverage
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Keep scenario markers and expected tool payload shapes synchronized between the mock service and the harness tests
Tested: cargo fmt --all
Tested: cargo clippy --workspace --all-targets -- -D warnings
Tested: cargo test --workspace
Tested: ./scripts/run_mock_parity_harness.sh
Not-tested: Live Anthropic responses beyond the five scripted harness scenarios
Yeachan-Heo 2 kuukautta sitten
vanhempi
commit
c2f1304a01

+ 7 - 1
PARITY.md

@@ -2,6 +2,12 @@
 
 Last updated: 2026-04-03 (`03bd7f0`)
 
+## Mock parity harness — milestone 1
+
+- [x] Deterministic Anthropic-compatible mock service (`rust/crates/mock-anthropic-service`)
+- [x] Reproducible clean-environment CLI harness (`rust/crates/rusty-claude-cli/tests/mock_parity_harness.rs`)
+- [x] Scripted scenarios: `streaming_text`, `read_file_roundtrip`, `grep_chunk_assembly`, `write_file_allowed`, `write_file_denied`
+
 ## Tool Surface: 40/40 (spec parity)
 
 ### Real Implementations (behavioral parity — varying depth)
@@ -90,7 +96,7 @@ Last updated: 2026-04-03 (`03bd7f0`)
 - [ ] Output truncation (large stdout/file content)
 - [ ] Session compaction behavior matching
 - [ ] Token counting / cost tracking accuracy
-- [ ] Streaming response support
+- [x] Streaming response support validated by the mock parity harness
 
 ## Migration Readiness
 

+ 10 - 0
rust/Cargo.lock

@@ -719,6 +719,15 @@ dependencies = [
  "windows-sys 0.61.2",
 ]
 
+[[package]]
+name = "mock-anthropic-service"
+version = "0.1.0"
+dependencies = [
+ "api",
+ "serde_json",
+ "tokio",
+]
+
 [[package]]
 name = "nibble_vec"
 version = "0.1.0"
@@ -1194,6 +1203,7 @@ dependencies = [
  "commands",
  "compat-harness",
  "crossterm",
+ "mock-anthropic-service",
  "plugins",
  "pulldown-cmark",
  "runtime",

+ 35 - 0
rust/MOCK_PARITY_HARNESS.md

@@ -0,0 +1,35 @@
+# Mock LLM parity harness
+
+This milestone adds a deterministic Anthropic-compatible mock service plus a reproducible CLI harness for the Rust `claw` binary.
+
+## Artifacts
+
+- `crates/mock-anthropic-service/` — mock `/v1/messages` service
+- `crates/rusty-claude-cli/tests/mock_parity_harness.rs` — end-to-end clean-environment harness
+- `scripts/run_mock_parity_harness.sh` — convenience wrapper
+
+## Scenarios
+
+The harness runs these scripted scenarios against a fresh workspace and isolated environment variables:
+
+1. `streaming_text`
+2. `read_file_roundtrip`
+3. `grep_chunk_assembly`
+4. `write_file_allowed`
+5. `write_file_denied`
+
+## Run
+
+```bash
+cd rust/
+./scripts/run_mock_parity_harness.sh
+```
+
+## Manual mock server
+
+```bash
+cd rust/
+cargo run -p mock-anthropic-service -- --bind 127.0.0.1:0
+```
+
+The server prints `MOCK_ANTHROPIC_BASE_URL=...`; point `ANTHROPIC_BASE_URL` at that URL and use any non-empty `ANTHROPIC_API_KEY`.

+ 31 - 1
rust/README.md

@@ -35,6 +35,34 @@ Or authenticate via OAuth:
 claw login
 ```
 
+## Mock parity harness
+
+The workspace now includes a deterministic Anthropic-compatible mock service and a clean-environment CLI harness for end-to-end parity checks.
+
+```bash
+cd rust/
+
+# Run the scripted clean-environment harness
+./scripts/run_mock_parity_harness.sh
+
+# Or start the mock service manually for ad hoc CLI runs
+cargo run -p mock-anthropic-service -- --bind 127.0.0.1:0
+```
+
+Harness coverage:
+
+- `streaming_text`
+- `read_file_roundtrip`
+- `grep_chunk_assembly`
+- `write_file_allowed`
+- `write_file_denied`
+
+Primary artifacts:
+
+- `crates/mock-anthropic-service/` — reusable mock Anthropic-compatible service
+- `crates/rusty-claude-cli/tests/mock_parity_harness.rs` — clean-env CLI harness
+- `scripts/run_mock_parity_harness.sh` — reproducible wrapper
+
 ## Features
 
 | Feature | Status |
@@ -124,6 +152,7 @@ rust/
     ├── api/                # Anthropic API client + SSE streaming
     ├── commands/           # Shared slash-command registry
     ├── compat-harness/     # TS manifest extraction harness
+    ├── mock-anthropic-service/ # Deterministic local Anthropic-compatible mock
     ├── runtime/            # Session, config, permissions, MCP, prompts
     ├── rusty-claude-cli/   # Main CLI binary (`claw`)
     └── tools/              # Built-in tool implementations
@@ -134,6 +163,7 @@ rust/
 - **api** — HTTP client, SSE stream parser, request/response types, auth (API key + OAuth bearer)
 - **commands** — Slash command definitions and help text generation
 - **compat-harness** — Extracts tool/prompt manifests from upstream TS source
+- **mock-anthropic-service** — Deterministic `/v1/messages` mock for CLI parity tests and local harness runs
 - **runtime** — `ConversationRuntime` agentic loop, `ConfigLoader` hierarchy, `Session` persistence, permission policy, MCP client, system prompt assembly, usage tracking
 - **rusty-claude-cli** — REPL, one-shot prompt, streaming display, tool call rendering, CLI argument parsing
 - **tools** — Tool specs + execution: Bash, ReadFile, WriteFile, EditFile, GlobSearch, GrepSearch, WebSearch, WebFetch, Agent, TodoWrite, NotebookEdit, Skill, ToolSearch, REPL runtimes
@@ -141,7 +171,7 @@ rust/
 ## Stats
 
 - **~20K lines** of Rust
-- **6 crates** in workspace
+- **7 crates** in workspace
 - **Binary name:** `claw`
 - **Default model:** `claude-opus-4-6`
 - **Default permissions:** `danger-full-access`

+ 18 - 0
rust/crates/mock-anthropic-service/Cargo.toml

@@ -0,0 +1,18 @@
+[package]
+name = "mock-anthropic-service"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+publish.workspace = true
+
+[[bin]]
+name = "mock-anthropic-service"
+path = "src/main.rs"
+
+[dependencies]
+api = { path = "../api" }
+serde_json.workspace = true
+tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-thread", "signal", "sync"] }
+
+[lints]
+workspace = true

+ 712 - 0
rust/crates/mock-anthropic-service/src/lib.rs

@@ -0,0 +1,712 @@
+use std::collections::HashMap;
+use std::io;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use api::{InputContentBlock, MessageRequest, MessageResponse, OutputContentBlock, Usage};
+use serde_json::{json, Value};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::TcpListener;
+use tokio::sync::{oneshot, Mutex};
+use tokio::task::JoinHandle;
+
+pub const SCENARIO_PREFIX: &str = "PARITY_SCENARIO:";
+pub const DEFAULT_MODEL: &str = "claude-sonnet-4-6";
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CapturedRequest {
+    pub method: String,
+    pub path: String,
+    pub headers: HashMap<String, String>,
+    pub scenario: String,
+    pub stream: bool,
+    pub raw_body: String,
+}
+
+pub struct MockAnthropicService {
+    base_url: String,
+    requests: Arc<Mutex<Vec<CapturedRequest>>>,
+    shutdown: Option<oneshot::Sender<()>>,
+    join_handle: JoinHandle<()>,
+}
+
+impl MockAnthropicService {
+    pub async fn spawn() -> io::Result<Self> {
+        Self::spawn_on("127.0.0.1:0").await
+    }
+
+    pub async fn spawn_on(bind_addr: &str) -> io::Result<Self> {
+        let listener = TcpListener::bind(bind_addr).await?;
+        let address = listener.local_addr()?;
+        let requests = Arc::new(Mutex::new(Vec::new()));
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+        let request_state = Arc::clone(&requests);
+
+        let join_handle = tokio::spawn(async move {
+            loop {
+                tokio::select! {
+                    _ = &mut shutdown_rx => break,
+                    accepted = listener.accept() => {
+                        let Ok((socket, _)) = accepted else {
+                            break;
+                        };
+                        let request_state = Arc::clone(&request_state);
+                        tokio::spawn(async move {
+                            let _ = handle_connection(socket, request_state).await;
+                        });
+                    }
+                }
+            }
+        });
+
+        Ok(Self {
+            base_url: format!("http://{address}"),
+            requests,
+            shutdown: Some(shutdown_tx),
+            join_handle,
+        })
+    }
+
+    #[must_use]
+    pub fn base_url(&self) -> String {
+        self.base_url.clone()
+    }
+
+    pub async fn captured_requests(&self) -> Vec<CapturedRequest> {
+        self.requests.lock().await.clone()
+    }
+}
+
+impl Drop for MockAnthropicService {
+    fn drop(&mut self) {
+        if let Some(shutdown) = self.shutdown.take() {
+            let _ = shutdown.send(());
+        }
+        self.join_handle.abort();
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum Scenario {
+    StreamingText,
+    ReadFileRoundtrip,
+    GrepChunkAssembly,
+    WriteFileAllowed,
+    WriteFileDenied,
+}
+
+impl Scenario {
+    fn parse(value: &str) -> Option<Self> {
+        match value.trim() {
+            "streaming_text" => Some(Self::StreamingText),
+            "read_file_roundtrip" => Some(Self::ReadFileRoundtrip),
+            "grep_chunk_assembly" => Some(Self::GrepChunkAssembly),
+            "write_file_allowed" => Some(Self::WriteFileAllowed),
+            "write_file_denied" => Some(Self::WriteFileDenied),
+            _ => None,
+        }
+    }
+
+    fn name(self) -> &'static str {
+        match self {
+            Self::StreamingText => "streaming_text",
+            Self::ReadFileRoundtrip => "read_file_roundtrip",
+            Self::GrepChunkAssembly => "grep_chunk_assembly",
+            Self::WriteFileAllowed => "write_file_allowed",
+            Self::WriteFileDenied => "write_file_denied",
+        }
+    }
+}
+
+async fn handle_connection(
+    mut socket: tokio::net::TcpStream,
+    requests: Arc<Mutex<Vec<CapturedRequest>>>,
+) -> io::Result<()> {
+    let (method, path, headers, raw_body) = read_http_request(&mut socket).await?;
+    let request: MessageRequest = serde_json::from_str(&raw_body)
+        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error.to_string()))?;
+    let scenario = detect_scenario(&request)
+        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "missing parity scenario"))?;
+
+    requests.lock().await.push(CapturedRequest {
+        method,
+        path,
+        headers,
+        scenario: scenario.name().to_string(),
+        stream: request.stream,
+        raw_body,
+    });
+
+    let response = build_http_response(&request, scenario);
+    socket.write_all(response.as_bytes()).await?;
+    Ok(())
+}
+
+async fn read_http_request(
+    socket: &mut tokio::net::TcpStream,
+) -> io::Result<(String, String, HashMap<String, String>, String)> {
+    let mut buffer = Vec::new();
+    let mut header_end = None;
+
+    loop {
+        let mut chunk = [0_u8; 1024];
+        let read = socket.read(&mut chunk).await?;
+        if read == 0 {
+            break;
+        }
+        buffer.extend_from_slice(&chunk[..read]);
+        if let Some(position) = find_header_end(&buffer) {
+            header_end = Some(position);
+            break;
+        }
+    }
+
+    let header_end = header_end
+        .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "missing http headers"))?;
+    let (header_bytes, remaining) = buffer.split_at(header_end);
+    let header_text = String::from_utf8(header_bytes.to_vec())
+        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error.to_string()))?;
+    let mut lines = header_text.split("\r\n");
+    let request_line = lines
+        .next()
+        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing request line"))?;
+    let mut request_parts = request_line.split_whitespace();
+    let method = request_parts
+        .next()
+        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing method"))?
+        .to_string();
+    let path = request_parts
+        .next()
+        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing path"))?
+        .to_string();
+
+    let mut headers = HashMap::new();
+    let mut content_length = 0_usize;
+    for line in lines {
+        if line.is_empty() {
+            continue;
+        }
+        let (name, value) = line.split_once(':').ok_or_else(|| {
+            io::Error::new(io::ErrorKind::InvalidData, "malformed http header line")
+        })?;
+        let value = value.trim().to_string();
+        if name.eq_ignore_ascii_case("content-length") {
+            content_length = value.parse().map_err(|error| {
+                io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    format!("invalid content-length: {error}"),
+                )
+            })?;
+        }
+        headers.insert(name.to_ascii_lowercase(), value);
+    }
+
+    let mut body = remaining[4..].to_vec();
+    while body.len() < content_length {
+        let mut chunk = vec![0_u8; content_length - body.len()];
+        let read = socket.read(&mut chunk).await?;
+        if read == 0 {
+            break;
+        }
+        body.extend_from_slice(&chunk[..read]);
+    }
+
+    let body = String::from_utf8(body)
+        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error.to_string()))?;
+    Ok((method, path, headers, body))
+}
+
+fn find_header_end(bytes: &[u8]) -> Option<usize> {
+    bytes.windows(4).position(|window| window == b"\r\n\r\n")
+}
+
+fn detect_scenario(request: &MessageRequest) -> Option<Scenario> {
+    request.messages.iter().rev().find_map(|message| {
+        message.content.iter().rev().find_map(|block| match block {
+            InputContentBlock::Text { text } => text
+                .split_whitespace()
+                .find_map(|token| token.strip_prefix(SCENARIO_PREFIX))
+                .and_then(Scenario::parse),
+            _ => None,
+        })
+    })
+}
+
+fn latest_tool_result(request: &MessageRequest) -> Option<(String, bool)> {
+    request.messages.iter().rev().find_map(|message| {
+        message.content.iter().rev().find_map(|block| match block {
+            InputContentBlock::ToolResult {
+                content, is_error, ..
+            } => Some((flatten_tool_result_content(content), *is_error)),
+            _ => None,
+        })
+    })
+}
+
+fn flatten_tool_result_content(content: &[api::ToolResultContentBlock]) -> String {
+    content
+        .iter()
+        .map(|block| match block {
+            api::ToolResultContentBlock::Text { text } => text.clone(),
+            api::ToolResultContentBlock::Json { value } => value.to_string(),
+        })
+        .collect::<Vec<_>>()
+        .join("\n")
+}
+
+#[allow(clippy::too_many_lines)]
+fn build_http_response(request: &MessageRequest, scenario: Scenario) -> String {
+    let response = if request.stream {
+        let body = build_stream_body(request, scenario);
+        return http_response(
+            "200 OK",
+            "text/event-stream",
+            &body,
+            &[("x-request-id", request_id_for(scenario))],
+        );
+    } else {
+        build_message_response(request, scenario)
+    };
+
+    http_response(
+        "200 OK",
+        "application/json",
+        &serde_json::to_string(&response).expect("message response should serialize"),
+        &[("request-id", request_id_for(scenario))],
+    )
+}
+
+fn build_stream_body(request: &MessageRequest, scenario: Scenario) -> String {
+    match scenario {
+        Scenario::StreamingText => streaming_text_sse(),
+        Scenario::ReadFileRoundtrip => match latest_tool_result(request) {
+            Some((tool_output, _)) => final_text_sse(&format!(
+                "read_file roundtrip complete: {}",
+                extract_read_content(&tool_output)
+            )),
+            None => tool_use_sse(
+                "toolu_read_fixture",
+                "read_file",
+                &[r#"{"path":"fixture.txt"}"#],
+            ),
+        },
+        Scenario::GrepChunkAssembly => match latest_tool_result(request) {
+            Some((tool_output, _)) => final_text_sse(&format!(
+                "grep_search matched {} occurrences",
+                extract_num_matches(&tool_output)
+            )),
+            None => tool_use_sse(
+                "toolu_grep_fixture",
+                "grep_search",
+                &[
+                    "{\"pattern\":\"par",
+                    "ity\",\"path\":\"fixture.txt\"",
+                    ",\"output_mode\":\"count\"}",
+                ],
+            ),
+        },
+        Scenario::WriteFileAllowed => match latest_tool_result(request) {
+            Some((tool_output, _)) => final_text_sse(&format!(
+                "write_file succeeded: {}",
+                extract_file_path(&tool_output)
+            )),
+            None => tool_use_sse(
+                "toolu_write_allowed",
+                "write_file",
+                &[r#"{"path":"generated/output.txt","content":"created by mock service\n"}"#],
+            ),
+        },
+        Scenario::WriteFileDenied => match latest_tool_result(request) {
+            Some((tool_output, _)) => {
+                final_text_sse(&format!("write_file denied as expected: {tool_output}"))
+            }
+            None => tool_use_sse(
+                "toolu_write_denied",
+                "write_file",
+                &[r#"{"path":"generated/denied.txt","content":"should not exist\n"}"#],
+            ),
+        },
+    }
+}
+
+fn build_message_response(request: &MessageRequest, scenario: Scenario) -> MessageResponse {
+    match scenario {
+        Scenario::StreamingText => text_message_response(
+            "msg_streaming_text",
+            "Mock streaming says hello from the parity harness.",
+        ),
+        Scenario::ReadFileRoundtrip => match latest_tool_result(request) {
+            Some((tool_output, _)) => text_message_response(
+                "msg_read_file_final",
+                &format!(
+                    "read_file roundtrip complete: {}",
+                    extract_read_content(&tool_output)
+                ),
+            ),
+            None => tool_message_response(
+                "msg_read_file_tool",
+                "toolu_read_fixture",
+                "read_file",
+                json!({"path": "fixture.txt"}),
+            ),
+        },
+        Scenario::GrepChunkAssembly => match latest_tool_result(request) {
+            Some((tool_output, _)) => text_message_response(
+                "msg_grep_final",
+                &format!(
+                    "grep_search matched {} occurrences",
+                    extract_num_matches(&tool_output)
+                ),
+            ),
+            None => tool_message_response(
+                "msg_grep_tool",
+                "toolu_grep_fixture",
+                "grep_search",
+                json!({"pattern": "parity", "path": "fixture.txt", "output_mode": "count"}),
+            ),
+        },
+        Scenario::WriteFileAllowed => match latest_tool_result(request) {
+            Some((tool_output, _)) => text_message_response(
+                "msg_write_allowed_final",
+                &format!("write_file succeeded: {}", extract_file_path(&tool_output)),
+            ),
+            None => tool_message_response(
+                "msg_write_allowed_tool",
+                "toolu_write_allowed",
+                "write_file",
+                json!({"path": "generated/output.txt", "content": "created by mock service\n"}),
+            ),
+        },
+        Scenario::WriteFileDenied => match latest_tool_result(request) {
+            Some((tool_output, _)) => text_message_response(
+                "msg_write_denied_final",
+                &format!("write_file denied as expected: {tool_output}"),
+            ),
+            None => tool_message_response(
+                "msg_write_denied_tool",
+                "toolu_write_denied",
+                "write_file",
+                json!({"path": "generated/denied.txt", "content": "should not exist\n"}),
+            ),
+        },
+    }
+}
+
+fn request_id_for(scenario: Scenario) -> &'static str {
+    match scenario {
+        Scenario::StreamingText => "req_streaming_text",
+        Scenario::ReadFileRoundtrip => "req_read_file_roundtrip",
+        Scenario::GrepChunkAssembly => "req_grep_chunk_assembly",
+        Scenario::WriteFileAllowed => "req_write_file_allowed",
+        Scenario::WriteFileDenied => "req_write_file_denied",
+    }
+}
+
+fn http_response(status: &str, content_type: &str, body: &str, headers: &[(&str, &str)]) -> String {
+    let mut extra_headers = String::new();
+    for (name, value) in headers {
+        use std::fmt::Write as _;
+        write!(&mut extra_headers, "{name}: {value}\r\n").expect("header write should succeed");
+    }
+    format!(
+        "HTTP/1.1 {status}\r\ncontent-type: {content_type}\r\n{extra_headers}content-length: {}\r\nconnection: close\r\n\r\n{body}",
+        body.len()
+    )
+}
+
+fn text_message_response(id: &str, text: &str) -> MessageResponse {
+    MessageResponse {
+        id: id.to_string(),
+        kind: "message".to_string(),
+        role: "assistant".to_string(),
+        content: vec![OutputContentBlock::Text {
+            text: text.to_string(),
+        }],
+        model: DEFAULT_MODEL.to_string(),
+        stop_reason: Some("end_turn".to_string()),
+        stop_sequence: None,
+        usage: Usage {
+            input_tokens: 10,
+            cache_creation_input_tokens: 0,
+            cache_read_input_tokens: 0,
+            output_tokens: 6,
+        },
+        request_id: None,
+    }
+}
+
+fn tool_message_response(
+    id: &str,
+    tool_id: &str,
+    tool_name: &str,
+    input: Value,
+) -> MessageResponse {
+    MessageResponse {
+        id: id.to_string(),
+        kind: "message".to_string(),
+        role: "assistant".to_string(),
+        content: vec![OutputContentBlock::ToolUse {
+            id: tool_id.to_string(),
+            name: tool_name.to_string(),
+            input,
+        }],
+        model: DEFAULT_MODEL.to_string(),
+        stop_reason: Some("tool_use".to_string()),
+        stop_sequence: None,
+        usage: Usage {
+            input_tokens: 10,
+            cache_creation_input_tokens: 0,
+            cache_read_input_tokens: 0,
+            output_tokens: 3,
+        },
+        request_id: None,
+    }
+}
+
+fn streaming_text_sse() -> String {
+    let mut body = String::new();
+    append_sse(
+        &mut body,
+        "message_start",
+        json!({
+            "type": "message_start",
+            "message": {
+                "id": "msg_streaming_text",
+                "type": "message",
+                "role": "assistant",
+                "content": [],
+                "model": DEFAULT_MODEL,
+                "stop_reason": null,
+                "stop_sequence": null,
+                "usage": usage_json(11, 0)
+            }
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_start",
+        json!({
+            "type": "content_block_start",
+            "index": 0,
+            "content_block": {"type": "text", "text": ""}
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_delta",
+        json!({
+            "type": "content_block_delta",
+            "index": 0,
+            "delta": {"type": "text_delta", "text": "Mock streaming "}
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_delta",
+        json!({
+            "type": "content_block_delta",
+            "index": 0,
+            "delta": {"type": "text_delta", "text": "says hello from the parity harness."}
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_stop",
+        json!({
+            "type": "content_block_stop",
+            "index": 0
+        }),
+    );
+    append_sse(
+        &mut body,
+        "message_delta",
+        json!({
+            "type": "message_delta",
+            "delta": {"stop_reason": "end_turn", "stop_sequence": null},
+            "usage": usage_json(11, 8)
+        }),
+    );
+    append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
+    body
+}
+
+fn tool_use_sse(tool_id: &str, tool_name: &str, partial_json_chunks: &[&str]) -> String {
+    let mut body = String::new();
+    append_sse(
+        &mut body,
+        "message_start",
+        json!({
+            "type": "message_start",
+            "message": {
+                "id": format!("msg_{tool_id}"),
+                "type": "message",
+                "role": "assistant",
+                "content": [],
+                "model": DEFAULT_MODEL,
+                "stop_reason": null,
+                "stop_sequence": null,
+                "usage": usage_json(12, 0)
+            }
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_start",
+        json!({
+            "type": "content_block_start",
+            "index": 0,
+            "content_block": {
+                "type": "tool_use",
+                "id": tool_id,
+                "name": tool_name,
+                "input": {}
+            }
+        }),
+    );
+    for chunk in partial_json_chunks {
+        append_sse(
+            &mut body,
+            "content_block_delta",
+            json!({
+                "type": "content_block_delta",
+                "index": 0,
+                "delta": {"type": "input_json_delta", "partial_json": chunk}
+            }),
+        );
+    }
+    append_sse(
+        &mut body,
+        "content_block_stop",
+        json!({
+            "type": "content_block_stop",
+            "index": 0
+        }),
+    );
+    append_sse(
+        &mut body,
+        "message_delta",
+        json!({
+            "type": "message_delta",
+            "delta": {"stop_reason": "tool_use", "stop_sequence": null},
+            "usage": usage_json(12, 4)
+        }),
+    );
+    append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
+    body
+}
+
+fn final_text_sse(text: &str) -> String {
+    let mut body = String::new();
+    append_sse(
+        &mut body,
+        "message_start",
+        json!({
+            "type": "message_start",
+            "message": {
+                "id": unique_message_id(),
+                "type": "message",
+                "role": "assistant",
+                "content": [],
+                "model": DEFAULT_MODEL,
+                "stop_reason": null,
+                "stop_sequence": null,
+                "usage": usage_json(14, 0)
+            }
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_start",
+        json!({
+            "type": "content_block_start",
+            "index": 0,
+            "content_block": {"type": "text", "text": ""}
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_delta",
+        json!({
+            "type": "content_block_delta",
+            "index": 0,
+            "delta": {"type": "text_delta", "text": text}
+        }),
+    );
+    append_sse(
+        &mut body,
+        "content_block_stop",
+        json!({
+            "type": "content_block_stop",
+            "index": 0
+        }),
+    );
+    append_sse(
+        &mut body,
+        "message_delta",
+        json!({
+            "type": "message_delta",
+            "delta": {"stop_reason": "end_turn", "stop_sequence": null},
+            "usage": usage_json(14, 7)
+        }),
+    );
+    append_sse(&mut body, "message_stop", json!({"type": "message_stop"}));
+    body
+}
+
+#[allow(clippy::needless_pass_by_value)]
+fn append_sse(buffer: &mut String, event: &str, payload: Value) {
+    use std::fmt::Write as _;
+    writeln!(buffer, "event: {event}").expect("event write should succeed");
+    writeln!(buffer, "data: {payload}").expect("payload write should succeed");
+    buffer.push('\n');
+}
+
+fn usage_json(input_tokens: u32, output_tokens: u32) -> Value {
+    json!({
+        "input_tokens": input_tokens,
+        "cache_creation_input_tokens": 0,
+        "cache_read_input_tokens": 0,
+        "output_tokens": output_tokens
+    })
+}
+
+fn unique_message_id() -> String {
+    let nanos = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("clock should be after epoch")
+        .as_nanos();
+    format!("msg_{nanos}")
+}
+
+fn extract_read_content(tool_output: &str) -> String {
+    serde_json::from_str::<Value>(tool_output)
+        .ok()
+        .and_then(|value| {
+            value
+                .get("file")
+                .and_then(|file| file.get("content"))
+                .and_then(Value::as_str)
+                .map(ToOwned::to_owned)
+        })
+        .unwrap_or_else(|| tool_output.trim().to_string())
+}
+
+#[allow(clippy::cast_possible_truncation)]
+fn extract_num_matches(tool_output: &str) -> usize {
+    serde_json::from_str::<Value>(tool_output)
+        .ok()
+        .and_then(|value| value.get("numMatches").and_then(Value::as_u64))
+        .unwrap_or(0) as usize
+}
+
+fn extract_file_path(tool_output: &str) -> String {
+    serde_json::from_str::<Value>(tool_output)
+        .ok()
+        .and_then(|value| {
+            value
+                .get("filePath")
+                .and_then(Value::as_str)
+                .map(ToOwned::to_owned)
+        })
+        .unwrap_or_else(|| tool_output.trim().to_string())
+}

+ 34 - 0
rust/crates/mock-anthropic-service/src/main.rs

@@ -0,0 +1,34 @@
+use std::env;
+
+use mock_anthropic_service::MockAnthropicService;
+
+#[tokio::main(flavor = "multi_thread")]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let mut bind_addr = String::from("127.0.0.1:0");
+    let mut args = env::args().skip(1);
+    while let Some(arg) = args.next() {
+        match arg.as_str() {
+            "--bind" => {
+                bind_addr = args
+                    .next()
+                    .ok_or_else(|| "missing value for --bind".to_string())?;
+            }
+            flag if flag.starts_with("--bind=") => {
+                bind_addr = flag[7..].to_string();
+            }
+            "--help" | "-h" => {
+                println!("Usage: mock-anthropic-service [--bind HOST:PORT]");
+                return Ok(());
+            }
+            other => {
+                return Err(format!("unsupported argument: {other}").into());
+            }
+        }
+    }
+
+    let server = MockAnthropicService::spawn_on(&bind_addr).await?;
+    println!("MOCK_ANTHROPIC_BASE_URL={}", server.base_url());
+    tokio::signal::ctrl_c().await?;
+    drop(server);
+    Ok(())
+}

+ 5 - 0
rust/crates/rusty-claude-cli/Cargo.toml

@@ -25,3 +25,8 @@ tools = { path = "../tools" }
 
 [lints]
 workspace = true
+
+[dev-dependencies]
+mock-anthropic-service = { path = "../mock-anthropic-service" }
+serde_json.workspace = true
+tokio = { version = "1", features = ["rt-multi-thread"] }

+ 257 - 0
rust/crates/rusty-claude-cli/tests/mock_parity_harness.rs

@@ -0,0 +1,257 @@
+use std::fs;
+use std::path::{Path, PathBuf};
+use std::process::{Command, Output};
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use mock_anthropic_service::{MockAnthropicService, SCENARIO_PREFIX};
+use serde_json::Value;
+
+static TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
+
+#[test]
+fn clean_env_cli_reaches_mock_anthropic_service_across_scripted_parity_scenarios() {
+    let runtime = tokio::runtime::Runtime::new().expect("tokio runtime should build");
+    let server = runtime
+        .block_on(MockAnthropicService::spawn())
+        .expect("mock service should start");
+    let base_url = server.base_url();
+
+    let cases = [
+        ScenarioCase {
+            name: "streaming_text",
+            permission_mode: "read-only",
+            allowed_tools: None,
+            seed: seed_noop,
+            assert: assert_streaming_text,
+        },
+        ScenarioCase {
+            name: "read_file_roundtrip",
+            permission_mode: "read-only",
+            allowed_tools: Some("read_file"),
+            seed: seed_read_fixture,
+            assert: assert_read_file_roundtrip,
+        },
+        ScenarioCase {
+            name: "grep_chunk_assembly",
+            permission_mode: "read-only",
+            allowed_tools: Some("grep_search"),
+            seed: seed_grep_fixture,
+            assert: assert_grep_chunk_assembly,
+        },
+        ScenarioCase {
+            name: "write_file_allowed",
+            permission_mode: "workspace-write",
+            allowed_tools: Some("write_file"),
+            seed: seed_noop,
+            assert: assert_write_file_allowed,
+        },
+        ScenarioCase {
+            name: "write_file_denied",
+            permission_mode: "read-only",
+            allowed_tools: Some("write_file"),
+            seed: seed_noop,
+            assert: assert_write_file_denied,
+        },
+    ];
+
+    for case in cases {
+        let workspace = unique_temp_dir(case.name);
+        fs::create_dir_all(&workspace).expect("workspace should exist");
+        (case.seed)(&workspace);
+        let response = run_case(case, &workspace, &base_url);
+        (case.assert)(&workspace, &response);
+        fs::remove_dir_all(&workspace).expect("workspace cleanup should succeed");
+    }
+
+    let captured = runtime.block_on(server.captured_requests());
+    assert_eq!(
+        captured.len(),
+        9,
+        "five scenarios should produce nine requests"
+    );
+    assert!(captured
+        .iter()
+        .all(|request| request.path == "/v1/messages"));
+    assert!(captured.iter().all(|request| request.stream));
+
+    let scenarios = captured
+        .iter()
+        .map(|request| request.scenario.as_str())
+        .collect::<Vec<_>>();
+    assert_eq!(
+        scenarios,
+        vec![
+            "streaming_text",
+            "read_file_roundtrip",
+            "read_file_roundtrip",
+            "grep_chunk_assembly",
+            "grep_chunk_assembly",
+            "write_file_allowed",
+            "write_file_allowed",
+            "write_file_denied",
+            "write_file_denied",
+        ]
+    );
+}
+
+#[derive(Clone, Copy)]
+struct ScenarioCase {
+    name: &'static str,
+    permission_mode: &'static str,
+    allowed_tools: Option<&'static str>,
+    seed: fn(&Path),
+    assert: fn(&Path, &Value),
+}
+
+fn run_case(case: ScenarioCase, workspace: &Path, base_url: &str) -> Value {
+    let config_home = workspace.join("config-home");
+    let home = workspace.join("home");
+    fs::create_dir_all(config_home.join(".claw")).expect("config home should exist");
+    fs::create_dir_all(&home).expect("home should exist");
+
+    let mut command = Command::new(env!("CARGO_BIN_EXE_claw"));
+    command
+        .current_dir(workspace)
+        .env_clear()
+        .env("ANTHROPIC_API_KEY", "test-parity-key")
+        .env("ANTHROPIC_BASE_URL", base_url)
+        .env("CLAW_CONFIG_HOME", &config_home)
+        .env("HOME", &home)
+        .env("NO_COLOR", "1")
+        .args([
+            "--model",
+            "sonnet",
+            "--permission-mode",
+            case.permission_mode,
+            "--output-format=json",
+        ]);
+
+    if let Some(allowed_tools) = case.allowed_tools {
+        command.args(["--allowedTools", allowed_tools]);
+    }
+
+    let prompt = format!("{SCENARIO_PREFIX}{}", case.name);
+    let output = command.arg(prompt).output().expect("claw should launch");
+    assert_success(&output);
+    serde_json::from_slice(&output.stdout).expect("prompt output should be valid json")
+}
+
+fn seed_noop(_: &Path) {}
+
+fn seed_read_fixture(workspace: &Path) {
+    fs::write(workspace.join("fixture.txt"), "alpha parity line\n").expect("fixture should write");
+}
+
+fn seed_grep_fixture(workspace: &Path) {
+    fs::write(
+        workspace.join("fixture.txt"),
+        "alpha parity line\nbeta line\ngamma parity line\n",
+    )
+    .expect("grep fixture should write");
+}
+
+fn assert_streaming_text(_: &Path, response: &Value) {
+    assert_eq!(
+        response["message"],
+        Value::String("Mock streaming says hello from the parity harness.".to_string())
+    );
+    assert_eq!(response["iterations"], Value::from(1));
+    assert_eq!(response["tool_uses"], Value::Array(Vec::new()));
+    assert_eq!(response["tool_results"], Value::Array(Vec::new()));
+}
+
+fn assert_read_file_roundtrip(workspace: &Path, response: &Value) {
+    assert_eq!(response["iterations"], Value::from(2));
+    assert_eq!(
+        response["tool_uses"][0]["name"],
+        Value::String("read_file".to_string())
+    );
+    assert_eq!(
+        response["tool_uses"][0]["input"],
+        Value::String(r#"{"path":"fixture.txt"}"#.to_string())
+    );
+    assert!(response["message"]
+        .as_str()
+        .expect("message text")
+        .contains("alpha parity line"));
+    let output = response["tool_results"][0]["output"]
+        .as_str()
+        .expect("tool output");
+    assert!(output.contains(&workspace.join("fixture.txt").display().to_string()));
+    assert!(output.contains("alpha parity line"));
+}
+
+fn assert_grep_chunk_assembly(_: &Path, response: &Value) {
+    assert_eq!(response["iterations"], Value::from(2));
+    assert_eq!(
+        response["tool_uses"][0]["name"],
+        Value::String("grep_search".to_string())
+    );
+    assert_eq!(
+        response["tool_uses"][0]["input"],
+        Value::String(
+            r#"{"pattern":"parity","path":"fixture.txt","output_mode":"count"}"#.to_string()
+        )
+    );
+    assert!(response["message"]
+        .as_str()
+        .expect("message text")
+        .contains("2 occurrences"));
+    assert_eq!(response["tool_results"][0]["is_error"], Value::Bool(false));
+}
+
+fn assert_write_file_allowed(workspace: &Path, response: &Value) {
+    assert_eq!(response["iterations"], Value::from(2));
+    assert_eq!(
+        response["tool_uses"][0]["name"],
+        Value::String("write_file".to_string())
+    );
+    assert!(response["message"]
+        .as_str()
+        .expect("message text")
+        .contains("generated/output.txt"));
+    let generated = workspace.join("generated").join("output.txt");
+    let contents = fs::read_to_string(&generated).expect("generated file should exist");
+    assert_eq!(contents, "created by mock service\n");
+    assert_eq!(response["tool_results"][0]["is_error"], Value::Bool(false));
+}
+
+fn assert_write_file_denied(workspace: &Path, response: &Value) {
+    assert_eq!(response["iterations"], Value::from(2));
+    assert_eq!(
+        response["tool_uses"][0]["name"],
+        Value::String("write_file".to_string())
+    );
+    let tool_output = response["tool_results"][0]["output"]
+        .as_str()
+        .expect("tool output");
+    assert!(tool_output.contains("requires workspace-write permission"));
+    assert_eq!(response["tool_results"][0]["is_error"], Value::Bool(true));
+    assert!(response["message"]
+        .as_str()
+        .expect("message text")
+        .contains("denied as expected"));
+    assert!(!workspace.join("generated").join("denied.txt").exists());
+}
+
+fn assert_success(output: &Output) {
+    assert!(
+        output.status.success(),
+        "stdout:\n{}\n\nstderr:\n{}",
+        String::from_utf8_lossy(&output.stdout),
+        String::from_utf8_lossy(&output.stderr)
+    );
+}
+
+fn unique_temp_dir(label: &str) -> PathBuf {
+    let millis = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .expect("clock should be after epoch")
+        .as_millis();
+    let counter = TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
+    std::env::temp_dir().join(format!(
+        "claw-mock-parity-{label}-{}-{millis}-{counter}",
+        std::process::id()
+    ))
+}

+ 6 - 0
rust/scripts/run_mock_parity_harness.sh

@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+cd "$(dirname "$0")/.."
+
+cargo test -p rusty-claude-cli --test mock_parity_harness -- --nocapture