worker_boot.rs 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069
  1. //! In-memory worker-boot state machine and control registry.
  2. //!
  3. //! This provides a foundational control plane for reliable worker startup:
  4. //! trust-gate detection, ready-for-prompt handshakes, and prompt-misdelivery
  5. //! detection/recovery all live above raw terminal transport.
  6. use std::collections::HashMap;
  7. use std::path::{Path, PathBuf};
  8. use std::sync::{Arc, Mutex};
  9. use std::time::{SystemTime, UNIX_EPOCH};
  10. use serde::{Deserialize, Serialize};
  11. fn now_secs() -> u64 {
  12. SystemTime::now()
  13. .duration_since(UNIX_EPOCH)
  14. .unwrap_or_default()
  15. .as_secs()
  16. }
  17. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
  18. #[serde(rename_all = "snake_case")]
  19. pub enum WorkerStatus {
  20. Spawning,
  21. TrustRequired,
  22. ReadyForPrompt,
  23. Running,
  24. Finished,
  25. Failed,
  26. }
  27. impl std::fmt::Display for WorkerStatus {
  28. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  29. match self {
  30. Self::Spawning => write!(f, "spawning"),
  31. Self::TrustRequired => write!(f, "trust_required"),
  32. Self::ReadyForPrompt => write!(f, "ready_for_prompt"),
  33. Self::Running => write!(f, "running"),
  34. Self::Finished => write!(f, "finished"),
  35. Self::Failed => write!(f, "failed"),
  36. }
  37. }
  38. }
  39. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
  40. #[serde(rename_all = "snake_case")]
  41. pub enum WorkerFailureKind {
  42. TrustGate,
  43. PromptDelivery,
  44. Protocol,
  45. Provider,
  46. }
  47. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  48. pub struct WorkerFailure {
  49. pub kind: WorkerFailureKind,
  50. pub message: String,
  51. pub created_at: u64,
  52. }
  53. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
  54. #[serde(rename_all = "snake_case")]
  55. pub enum WorkerEventKind {
  56. Spawning,
  57. TrustRequired,
  58. TrustResolved,
  59. ReadyForPrompt,
  60. PromptMisdelivery,
  61. PromptReplayArmed,
  62. Running,
  63. Restarted,
  64. Finished,
  65. Failed,
  66. }
  67. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
  68. #[serde(rename_all = "snake_case")]
  69. pub enum WorkerTrustResolution {
  70. AutoAllowlisted,
  71. ManualApproval,
  72. }
  73. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
  74. #[serde(rename_all = "snake_case")]
  75. pub enum WorkerPromptTarget {
  76. Shell,
  77. WrongTarget,
  78. Unknown,
  79. }
  80. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  81. #[serde(tag = "type", rename_all = "snake_case")]
  82. pub enum WorkerEventPayload {
  83. TrustPrompt {
  84. cwd: String,
  85. #[serde(skip_serializing_if = "Option::is_none")]
  86. resolution: Option<WorkerTrustResolution>,
  87. },
  88. PromptDelivery {
  89. prompt_preview: String,
  90. observed_target: WorkerPromptTarget,
  91. #[serde(skip_serializing_if = "Option::is_none")]
  92. observed_cwd: Option<String>,
  93. recovery_armed: bool,
  94. },
  95. }
  96. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  97. pub struct WorkerEvent {
  98. pub seq: u64,
  99. pub kind: WorkerEventKind,
  100. pub status: WorkerStatus,
  101. pub detail: Option<String>,
  102. #[serde(skip_serializing_if = "Option::is_none")]
  103. pub payload: Option<WorkerEventPayload>,
  104. pub timestamp: u64,
  105. }
  106. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  107. pub struct Worker {
  108. pub worker_id: String,
  109. pub cwd: String,
  110. pub status: WorkerStatus,
  111. pub trust_auto_resolve: bool,
  112. pub trust_gate_cleared: bool,
  113. pub auto_recover_prompt_misdelivery: bool,
  114. pub prompt_delivery_attempts: u32,
  115. pub prompt_in_flight: bool,
  116. pub last_prompt: Option<String>,
  117. pub replay_prompt: Option<String>,
  118. pub last_error: Option<WorkerFailure>,
  119. pub created_at: u64,
  120. pub updated_at: u64,
  121. pub events: Vec<WorkerEvent>,
  122. }
  123. #[derive(Debug, Clone, Default)]
  124. pub struct WorkerRegistry {
  125. inner: Arc<Mutex<WorkerRegistryInner>>,
  126. }
  127. #[derive(Debug, Default)]
  128. struct WorkerRegistryInner {
  129. workers: HashMap<String, Worker>,
  130. counter: u64,
  131. }
  132. impl WorkerRegistry {
  133. #[must_use]
  134. pub fn new() -> Self {
  135. Self::default()
  136. }
  137. #[must_use]
  138. pub fn create(
  139. &self,
  140. cwd: &str,
  141. trusted_roots: &[String],
  142. auto_recover_prompt_misdelivery: bool,
  143. ) -> Worker {
  144. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  145. inner.counter += 1;
  146. let ts = now_secs();
  147. let worker_id = format!("worker_{:08x}_{}", ts, inner.counter);
  148. let trust_auto_resolve = trusted_roots
  149. .iter()
  150. .any(|root| path_matches_allowlist(cwd, root));
  151. let mut worker = Worker {
  152. worker_id: worker_id.clone(),
  153. cwd: cwd.to_owned(),
  154. status: WorkerStatus::Spawning,
  155. trust_auto_resolve,
  156. trust_gate_cleared: false,
  157. auto_recover_prompt_misdelivery,
  158. prompt_delivery_attempts: 0,
  159. prompt_in_flight: false,
  160. last_prompt: None,
  161. replay_prompt: None,
  162. last_error: None,
  163. created_at: ts,
  164. updated_at: ts,
  165. events: Vec::new(),
  166. };
  167. push_event(
  168. &mut worker,
  169. WorkerEventKind::Spawning,
  170. WorkerStatus::Spawning,
  171. Some("worker created".to_string()),
  172. None,
  173. );
  174. inner.workers.insert(worker_id, worker.clone());
  175. worker
  176. }
  177. #[must_use]
  178. pub fn get(&self, worker_id: &str) -> Option<Worker> {
  179. let inner = self.inner.lock().expect("worker registry lock poisoned");
  180. inner.workers.get(worker_id).cloned()
  181. }
  182. pub fn observe(&self, worker_id: &str, screen_text: &str) -> Result<Worker, String> {
  183. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  184. let worker = inner
  185. .workers
  186. .get_mut(worker_id)
  187. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  188. let lowered = screen_text.to_ascii_lowercase();
  189. if !worker.trust_gate_cleared && detect_trust_prompt(&lowered) {
  190. worker.status = WorkerStatus::TrustRequired;
  191. worker.last_error = Some(WorkerFailure {
  192. kind: WorkerFailureKind::TrustGate,
  193. message: "worker boot blocked on trust prompt".to_string(),
  194. created_at: now_secs(),
  195. });
  196. push_event(
  197. worker,
  198. WorkerEventKind::TrustRequired,
  199. WorkerStatus::TrustRequired,
  200. Some("trust prompt detected".to_string()),
  201. Some(WorkerEventPayload::TrustPrompt {
  202. cwd: worker.cwd.clone(),
  203. resolution: None,
  204. }),
  205. );
  206. if worker.trust_auto_resolve {
  207. worker.trust_gate_cleared = true;
  208. worker.last_error = None;
  209. worker.status = WorkerStatus::Spawning;
  210. push_event(
  211. worker,
  212. WorkerEventKind::TrustResolved,
  213. WorkerStatus::Spawning,
  214. Some("allowlisted repo auto-resolved trust prompt".to_string()),
  215. Some(WorkerEventPayload::TrustPrompt {
  216. cwd: worker.cwd.clone(),
  217. resolution: Some(WorkerTrustResolution::AutoAllowlisted),
  218. }),
  219. );
  220. } else {
  221. return Ok(worker.clone());
  222. }
  223. }
  224. if let Some(observation) = prompt_misdelivery_is_relevant(worker)
  225. .then(|| {
  226. detect_prompt_misdelivery(
  227. screen_text,
  228. &lowered,
  229. worker.last_prompt.as_deref(),
  230. &worker.cwd,
  231. )
  232. })
  233. .flatten()
  234. {
  235. let prompt_preview = prompt_preview(worker.last_prompt.as_deref().unwrap_or_default());
  236. let message = match observation.target {
  237. WorkerPromptTarget::Shell => {
  238. format!("worker prompt landed in shell instead of coding agent: {prompt_preview}")
  239. }
  240. WorkerPromptTarget::WrongTarget => format!(
  241. "worker prompt landed in the wrong target instead of {}: {}",
  242. worker.cwd, prompt_preview
  243. ),
  244. WorkerPromptTarget::Unknown => format!(
  245. "worker prompt delivery failed before reaching coding agent: {prompt_preview}"
  246. ),
  247. };
  248. worker.last_error = Some(WorkerFailure {
  249. kind: WorkerFailureKind::PromptDelivery,
  250. message,
  251. created_at: now_secs(),
  252. });
  253. worker.prompt_in_flight = false;
  254. push_event(
  255. worker,
  256. WorkerEventKind::PromptMisdelivery,
  257. WorkerStatus::Failed,
  258. Some(prompt_misdelivery_detail(&observation).to_string()),
  259. Some(WorkerEventPayload::PromptDelivery {
  260. prompt_preview: prompt_preview.clone(),
  261. observed_target: observation.target,
  262. observed_cwd: observation.observed_cwd.clone(),
  263. recovery_armed: false,
  264. }),
  265. );
  266. if worker.auto_recover_prompt_misdelivery {
  267. worker.replay_prompt = worker.last_prompt.clone();
  268. worker.status = WorkerStatus::ReadyForPrompt;
  269. push_event(
  270. worker,
  271. WorkerEventKind::PromptReplayArmed,
  272. WorkerStatus::ReadyForPrompt,
  273. Some("prompt replay armed after prompt misdelivery".to_string()),
  274. Some(WorkerEventPayload::PromptDelivery {
  275. prompt_preview,
  276. observed_target: observation.target,
  277. observed_cwd: observation.observed_cwd,
  278. recovery_armed: true,
  279. }),
  280. );
  281. } else {
  282. worker.status = WorkerStatus::Failed;
  283. }
  284. return Ok(worker.clone());
  285. }
  286. if detect_running_cue(&lowered) && worker.prompt_in_flight {
  287. worker.prompt_in_flight = false;
  288. worker.status = WorkerStatus::Running;
  289. worker.last_error = None;
  290. }
  291. if detect_ready_for_prompt(screen_text, &lowered) && worker.status != WorkerStatus::ReadyForPrompt {
  292. worker.status = WorkerStatus::ReadyForPrompt;
  293. worker.prompt_in_flight = false;
  294. if matches!(
  295. worker.last_error.as_ref().map(|failure| failure.kind),
  296. Some(WorkerFailureKind::TrustGate)
  297. ) {
  298. worker.last_error = None;
  299. }
  300. push_event(
  301. worker,
  302. WorkerEventKind::ReadyForPrompt,
  303. WorkerStatus::ReadyForPrompt,
  304. Some("worker is ready for prompt delivery".to_string()),
  305. None,
  306. );
  307. }
  308. Ok(worker.clone())
  309. }
  310. pub fn resolve_trust(&self, worker_id: &str) -> Result<Worker, String> {
  311. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  312. let worker = inner
  313. .workers
  314. .get_mut(worker_id)
  315. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  316. if worker.status != WorkerStatus::TrustRequired {
  317. return Err(format!(
  318. "worker {worker_id} is not waiting on trust; current status: {}",
  319. worker.status
  320. ));
  321. }
  322. worker.trust_gate_cleared = true;
  323. worker.last_error = None;
  324. worker.status = WorkerStatus::Spawning;
  325. push_event(
  326. worker,
  327. WorkerEventKind::TrustResolved,
  328. WorkerStatus::Spawning,
  329. Some("trust prompt resolved manually".to_string()),
  330. Some(WorkerEventPayload::TrustPrompt {
  331. cwd: worker.cwd.clone(),
  332. resolution: Some(WorkerTrustResolution::ManualApproval),
  333. }),
  334. );
  335. Ok(worker.clone())
  336. }
  337. pub fn send_prompt(&self, worker_id: &str, prompt: Option<&str>) -> Result<Worker, String> {
  338. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  339. let worker = inner
  340. .workers
  341. .get_mut(worker_id)
  342. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  343. if worker.status != WorkerStatus::ReadyForPrompt {
  344. return Err(format!(
  345. "worker {worker_id} is not ready for prompt delivery; current status: {}",
  346. worker.status
  347. ));
  348. }
  349. let next_prompt = prompt
  350. .map(str::trim)
  351. .filter(|value| !value.is_empty())
  352. .map(str::to_owned)
  353. .or_else(|| worker.replay_prompt.clone())
  354. .ok_or_else(|| format!("worker {worker_id} has no prompt to send or replay"))?;
  355. worker.prompt_delivery_attempts += 1;
  356. worker.prompt_in_flight = true;
  357. worker.last_prompt = Some(next_prompt.clone());
  358. worker.replay_prompt = None;
  359. worker.last_error = None;
  360. worker.status = WorkerStatus::Running;
  361. push_event(
  362. worker,
  363. WorkerEventKind::Running,
  364. WorkerStatus::Running,
  365. Some(format!(
  366. "prompt dispatched to worker: {}",
  367. prompt_preview(&next_prompt)
  368. )),
  369. None,
  370. );
  371. Ok(worker.clone())
  372. }
  373. pub fn await_ready(&self, worker_id: &str) -> Result<WorkerReadySnapshot, String> {
  374. let worker = self
  375. .get(worker_id)
  376. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  377. Ok(WorkerReadySnapshot {
  378. worker_id: worker.worker_id.clone(),
  379. status: worker.status,
  380. ready: worker.status == WorkerStatus::ReadyForPrompt,
  381. blocked: matches!(worker.status, WorkerStatus::TrustRequired | WorkerStatus::Failed),
  382. replay_prompt_ready: worker.replay_prompt.is_some(),
  383. last_error: worker.last_error.clone(),
  384. })
  385. }
  386. pub fn restart(&self, worker_id: &str) -> Result<Worker, String> {
  387. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  388. let worker = inner
  389. .workers
  390. .get_mut(worker_id)
  391. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  392. worker.status = WorkerStatus::Spawning;
  393. worker.trust_gate_cleared = false;
  394. worker.last_prompt = None;
  395. worker.replay_prompt = None;
  396. worker.last_error = None;
  397. worker.prompt_delivery_attempts = 0;
  398. worker.prompt_in_flight = false;
  399. push_event(
  400. worker,
  401. WorkerEventKind::Restarted,
  402. WorkerStatus::Spawning,
  403. Some("worker restarted".to_string()),
  404. None,
  405. );
  406. Ok(worker.clone())
  407. }
  408. pub fn terminate(&self, worker_id: &str) -> Result<Worker, String> {
  409. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  410. let worker = inner
  411. .workers
  412. .get_mut(worker_id)
  413. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  414. worker.status = WorkerStatus::Finished;
  415. worker.prompt_in_flight = false;
  416. push_event(
  417. worker,
  418. WorkerEventKind::Finished,
  419. WorkerStatus::Finished,
  420. Some("worker terminated by control plane".to_string()),
  421. None,
  422. );
  423. Ok(worker.clone())
  424. }
  425. /// Classify session completion and transition worker to appropriate terminal state.
  426. /// Detects degraded completions (finish="unknown" with zero tokens) as provider failures.
  427. pub fn observe_completion(
  428. &self,
  429. worker_id: &str,
  430. finish_reason: &str,
  431. tokens_output: u64,
  432. ) -> Result<Worker, String> {
  433. let mut inner = self.inner.lock().expect("worker registry lock poisoned");
  434. let worker = inner
  435. .workers
  436. .get_mut(worker_id)
  437. .ok_or_else(|| format!("worker not found: {worker_id}"))?;
  438. let is_provider_failure =
  439. (finish_reason == "unknown" && tokens_output == 0) || finish_reason == "error";
  440. if is_provider_failure {
  441. let message = if finish_reason == "unknown" && tokens_output == 0 {
  442. "session completed with finish='unknown' and zero output — provider degraded or context exhausted".to_string()
  443. } else {
  444. format!("session failed with finish='{finish_reason}' — provider error")
  445. };
  446. worker.last_error = Some(WorkerFailure {
  447. kind: WorkerFailureKind::Provider,
  448. message,
  449. created_at: now_secs(),
  450. });
  451. worker.status = WorkerStatus::Failed;
  452. worker.prompt_in_flight = false;
  453. push_event(
  454. worker,
  455. WorkerEventKind::Failed,
  456. WorkerStatus::Failed,
  457. Some("provider failure classified".to_string()),
  458. None,
  459. );
  460. } else {
  461. worker.status = WorkerStatus::Finished;
  462. worker.prompt_in_flight = false;
  463. worker.last_error = None;
  464. push_event(
  465. worker,
  466. WorkerEventKind::Finished,
  467. WorkerStatus::Finished,
  468. Some(format!(
  469. "session completed: finish='{finish_reason}', tokens={tokens_output}"
  470. )),
  471. None,
  472. );
  473. }
  474. Ok(worker.clone())
  475. }
  476. }
  477. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
  478. pub struct WorkerReadySnapshot {
  479. pub worker_id: String,
  480. pub status: WorkerStatus,
  481. pub ready: bool,
  482. pub blocked: bool,
  483. pub replay_prompt_ready: bool,
  484. pub last_error: Option<WorkerFailure>,
  485. }
  486. fn prompt_misdelivery_is_relevant(worker: &Worker) -> bool {
  487. worker.prompt_in_flight && worker.last_prompt.is_some()
  488. }
  489. #[derive(Debug, Clone, PartialEq, Eq)]
  490. struct PromptDeliveryObservation {
  491. target: WorkerPromptTarget,
  492. observed_cwd: Option<String>,
  493. }
  494. fn push_event(
  495. worker: &mut Worker,
  496. kind: WorkerEventKind,
  497. status: WorkerStatus,
  498. detail: Option<String>,
  499. payload: Option<WorkerEventPayload>,
  500. ) {
  501. let timestamp = now_secs();
  502. let seq = worker.events.len() as u64 + 1;
  503. worker.updated_at = timestamp;
  504. worker.events.push(WorkerEvent {
  505. seq,
  506. kind,
  507. status,
  508. detail,
  509. payload,
  510. timestamp,
  511. });
  512. }
  513. fn path_matches_allowlist(cwd: &str, trusted_root: &str) -> bool {
  514. let cwd = normalize_path(cwd);
  515. let trusted_root = normalize_path(trusted_root);
  516. cwd == trusted_root || cwd.starts_with(&trusted_root)
  517. }
  518. fn normalize_path(path: &str) -> PathBuf {
  519. std::fs::canonicalize(path).unwrap_or_else(|_| Path::new(path).to_path_buf())
  520. }
  521. fn detect_trust_prompt(lowered: &str) -> bool {
  522. [
  523. "do you trust the files in this folder",
  524. "trust the files in this folder",
  525. "trust this folder",
  526. "allow and continue",
  527. "yes, proceed",
  528. ]
  529. .iter()
  530. .any(|needle| lowered.contains(needle))
  531. }
  532. fn detect_ready_for_prompt(screen_text: &str, lowered: &str) -> bool {
  533. if [
  534. "ready for input",
  535. "ready for your input",
  536. "ready for prompt",
  537. "send a message",
  538. ]
  539. .iter()
  540. .any(|needle| lowered.contains(needle))
  541. {
  542. return true;
  543. }
  544. let Some(last_non_empty) = screen_text
  545. .lines()
  546. .rev()
  547. .find(|line| !line.trim().is_empty())
  548. else {
  549. return false;
  550. };
  551. let trimmed = last_non_empty.trim();
  552. if is_shell_prompt(trimmed) {
  553. return false;
  554. }
  555. trimmed == ">"
  556. || trimmed == "›"
  557. || trimmed == "❯"
  558. || trimmed.starts_with("> ")
  559. || trimmed.starts_with("› ")
  560. || trimmed.starts_with("❯ ")
  561. || trimmed.contains("│ >")
  562. || trimmed.contains("│ ›")
  563. || trimmed.contains("│ ❯")
  564. }
  565. fn detect_running_cue(lowered: &str) -> bool {
  566. [
  567. "thinking",
  568. "working",
  569. "running tests",
  570. "inspecting",
  571. "analyzing",
  572. ]
  573. .iter()
  574. .any(|needle| lowered.contains(needle))
  575. }
  576. fn is_shell_prompt(trimmed: &str) -> bool {
  577. trimmed.ends_with('$')
  578. || trimmed.ends_with('%')
  579. || trimmed.ends_with('#')
  580. || trimmed.starts_with('$')
  581. || trimmed.starts_with('%')
  582. || trimmed.starts_with('#')
  583. }
  584. fn detect_prompt_misdelivery(
  585. screen_text: &str,
  586. lowered: &str,
  587. prompt: Option<&str>,
  588. expected_cwd: &str,
  589. ) -> Option<PromptDeliveryObservation> {
  590. let Some(prompt) = prompt else {
  591. return None;
  592. };
  593. let prompt_snippet = prompt
  594. .lines()
  595. .find(|line| !line.trim().is_empty())
  596. .map(|line| line.trim().to_ascii_lowercase())
  597. .unwrap_or_default();
  598. if prompt_snippet.is_empty() {
  599. return None;
  600. }
  601. let prompt_visible = lowered.contains(&prompt_snippet);
  602. if let Some(observed_cwd) = detect_observed_shell_cwd(screen_text) {
  603. if prompt_visible && !cwd_matches_observed_target(expected_cwd, &observed_cwd) {
  604. return Some(PromptDeliveryObservation {
  605. target: WorkerPromptTarget::WrongTarget,
  606. observed_cwd: Some(observed_cwd),
  607. });
  608. }
  609. }
  610. let shell_error = [
  611. "command not found",
  612. "syntax error near unexpected token",
  613. "parse error near",
  614. "no such file or directory",
  615. "unknown command",
  616. ]
  617. .iter()
  618. .any(|needle| lowered.contains(needle));
  619. (shell_error && prompt_visible).then_some(PromptDeliveryObservation {
  620. target: WorkerPromptTarget::Shell,
  621. observed_cwd: None,
  622. })
  623. }
  624. fn prompt_preview(prompt: &str) -> String {
  625. let trimmed = prompt.trim();
  626. if trimmed.chars().count() <= 48 {
  627. return trimmed.to_string();
  628. }
  629. let preview = trimmed.chars().take(48).collect::<String>();
  630. format!("{}…", preview.trim_end())
  631. }
  632. fn prompt_misdelivery_detail(observation: &PromptDeliveryObservation) -> &'static str {
  633. match observation.target {
  634. WorkerPromptTarget::Shell => "shell misdelivery detected",
  635. WorkerPromptTarget::WrongTarget => "prompt landed in wrong target",
  636. WorkerPromptTarget::Unknown => "prompt delivery failure detected",
  637. }
  638. }
  639. fn detect_observed_shell_cwd(screen_text: &str) -> Option<String> {
  640. screen_text.lines().find_map(|line| {
  641. let tokens = line.split_whitespace().collect::<Vec<_>>();
  642. tokens
  643. .iter()
  644. .position(|token| is_shell_prompt_token(token))
  645. .and_then(|index| index.checked_sub(1).map(|cwd_index| tokens[cwd_index]))
  646. .filter(|candidate| looks_like_cwd_label(candidate))
  647. .map(ToOwned::to_owned)
  648. })
  649. }
  650. fn is_shell_prompt_token(token: &&str) -> bool {
  651. matches!(*token, "$" | "%" | "#" | ">" | "›" | "❯")
  652. }
  653. fn looks_like_cwd_label(candidate: &str) -> bool {
  654. candidate.starts_with('/')
  655. || candidate.starts_with('~')
  656. || candidate.starts_with('.')
  657. || candidate.contains('/')
  658. }
  659. fn cwd_matches_observed_target(expected_cwd: &str, observed_cwd: &str) -> bool {
  660. let expected = normalize_path(expected_cwd);
  661. let expected_base = expected
  662. .file_name()
  663. .map(|segment| segment.to_string_lossy().into_owned())
  664. .unwrap_or_else(|| expected.to_string_lossy().into_owned());
  665. let observed_base = Path::new(observed_cwd)
  666. .file_name()
  667. .map(|segment| segment.to_string_lossy().into_owned())
  668. .unwrap_or_else(|| observed_cwd.trim_matches(':').to_string());
  669. expected.to_string_lossy().ends_with(observed_cwd)
  670. || observed_cwd.ends_with(expected.to_string_lossy().as_ref())
  671. || expected_base == observed_base
  672. }
  673. #[cfg(test)]
  674. mod tests {
  675. use super::*;
  676. #[test]
  677. fn allowlisted_trust_prompt_auto_resolves_then_reaches_ready_state() {
  678. let registry = WorkerRegistry::new();
  679. let worker = registry.create(
  680. "/tmp/worktrees/repo-a",
  681. &["/tmp/worktrees".to_string()],
  682. true,
  683. );
  684. let after_trust = registry
  685. .observe(
  686. &worker.worker_id,
  687. "Do you trust the files in this folder?\n1. Yes, proceed\n2. No",
  688. )
  689. .expect("trust observe should succeed");
  690. assert_eq!(after_trust.status, WorkerStatus::Spawning);
  691. assert!(after_trust.trust_gate_cleared);
  692. let trust_required = after_trust
  693. .events
  694. .iter()
  695. .find(|event| event.kind == WorkerEventKind::TrustRequired)
  696. .expect("trust required event should exist");
  697. assert_eq!(
  698. trust_required.payload,
  699. Some(WorkerEventPayload::TrustPrompt {
  700. cwd: "/tmp/worktrees/repo-a".to_string(),
  701. resolution: None,
  702. })
  703. );
  704. let trust_resolved = after_trust
  705. .events
  706. .iter()
  707. .find(|event| event.kind == WorkerEventKind::TrustResolved)
  708. .expect("trust resolved event should exist");
  709. assert_eq!(
  710. trust_resolved.payload,
  711. Some(WorkerEventPayload::TrustPrompt {
  712. cwd: "/tmp/worktrees/repo-a".to_string(),
  713. resolution: Some(WorkerTrustResolution::AutoAllowlisted),
  714. })
  715. );
  716. let ready = registry
  717. .observe(&worker.worker_id, "Ready for your input\n>")
  718. .expect("ready observe should succeed");
  719. assert_eq!(ready.status, WorkerStatus::ReadyForPrompt);
  720. assert!(ready.last_error.is_none());
  721. }
  722. #[test]
  723. fn trust_prompt_blocks_non_allowlisted_worker_until_resolved() {
  724. let registry = WorkerRegistry::new();
  725. let worker = registry.create("/tmp/repo-b", &[], true);
  726. let blocked = registry
  727. .observe(
  728. &worker.worker_id,
  729. "Do you trust the files in this folder?\n1. Yes, proceed\n2. No",
  730. )
  731. .expect("trust observe should succeed");
  732. assert_eq!(blocked.status, WorkerStatus::TrustRequired);
  733. assert_eq!(
  734. blocked.last_error.expect("trust error should exist").kind,
  735. WorkerFailureKind::TrustGate
  736. );
  737. let send_before_resolve = registry.send_prompt(&worker.worker_id, Some("ship it"));
  738. assert!(send_before_resolve
  739. .expect_err("prompt delivery should be gated")
  740. .contains("not ready for prompt delivery"));
  741. let resolved = registry
  742. .resolve_trust(&worker.worker_id)
  743. .expect("manual trust resolution should succeed");
  744. assert_eq!(resolved.status, WorkerStatus::Spawning);
  745. assert!(resolved.trust_gate_cleared);
  746. let trust_resolved = resolved
  747. .events
  748. .iter()
  749. .find(|event| event.kind == WorkerEventKind::TrustResolved)
  750. .expect("manual trust resolve event should exist");
  751. assert_eq!(
  752. trust_resolved.payload,
  753. Some(WorkerEventPayload::TrustPrompt {
  754. cwd: "/tmp/repo-b".to_string(),
  755. resolution: Some(WorkerTrustResolution::ManualApproval),
  756. })
  757. );
  758. }
  759. #[test]
  760. fn ready_detection_ignores_plain_shell_prompts() {
  761. assert!(!detect_ready_for_prompt("bellman@host %", "bellman@host %"));
  762. assert!(!detect_ready_for_prompt("/tmp/repo $", "/tmp/repo $"));
  763. assert!(detect_ready_for_prompt("│ >", "│ >"));
  764. }
  765. #[test]
  766. fn prompt_misdelivery_is_detected_and_replay_can_be_rearmed() {
  767. let registry = WorkerRegistry::new();
  768. let worker = registry.create("/tmp/repo-c", &[], true);
  769. registry
  770. .observe(&worker.worker_id, "Ready for input\n>")
  771. .expect("ready observe should succeed");
  772. let running = registry
  773. .send_prompt(&worker.worker_id, Some("Implement worker handshake"))
  774. .expect("prompt send should succeed");
  775. assert_eq!(running.status, WorkerStatus::Running);
  776. assert_eq!(running.prompt_delivery_attempts, 1);
  777. assert!(running.prompt_in_flight);
  778. let recovered = registry
  779. .observe(
  780. &worker.worker_id,
  781. "% Implement worker handshake\nzsh: command not found: Implement",
  782. )
  783. .expect("misdelivery observe should succeed");
  784. assert_eq!(recovered.status, WorkerStatus::ReadyForPrompt);
  785. assert_eq!(
  786. recovered
  787. .last_error
  788. .expect("misdelivery error should exist")
  789. .kind,
  790. WorkerFailureKind::PromptDelivery
  791. );
  792. assert_eq!(
  793. recovered.replay_prompt.as_deref(),
  794. Some("Implement worker handshake")
  795. );
  796. let misdelivery = recovered
  797. .events
  798. .iter()
  799. .find(|event| event.kind == WorkerEventKind::PromptMisdelivery)
  800. .expect("misdelivery event should exist");
  801. assert_eq!(misdelivery.status, WorkerStatus::Failed);
  802. assert_eq!(
  803. misdelivery.payload,
  804. Some(WorkerEventPayload::PromptDelivery {
  805. prompt_preview: "Implement worker handshake".to_string(),
  806. observed_target: WorkerPromptTarget::Shell,
  807. observed_cwd: None,
  808. recovery_armed: false,
  809. })
  810. );
  811. let replay = recovered
  812. .events
  813. .iter()
  814. .find(|event| event.kind == WorkerEventKind::PromptReplayArmed)
  815. .expect("replay event should exist");
  816. assert_eq!(replay.status, WorkerStatus::ReadyForPrompt);
  817. assert_eq!(
  818. replay.payload,
  819. Some(WorkerEventPayload::PromptDelivery {
  820. prompt_preview: "Implement worker handshake".to_string(),
  821. observed_target: WorkerPromptTarget::Shell,
  822. observed_cwd: None,
  823. recovery_armed: true,
  824. })
  825. );
  826. let replayed = registry
  827. .send_prompt(&worker.worker_id, None)
  828. .expect("replay send should succeed");
  829. assert_eq!(replayed.status, WorkerStatus::Running);
  830. assert!(replayed.replay_prompt.is_none());
  831. assert_eq!(replayed.prompt_delivery_attempts, 2);
  832. }
  833. #[test]
  834. fn prompt_delivery_detects_wrong_target_and_replays_to_expected_worker() {
  835. let registry = WorkerRegistry::new();
  836. let worker = registry.create("/tmp/repo-target-a", &[], true);
  837. registry
  838. .observe(&worker.worker_id, "Ready for input\n>")
  839. .expect("ready observe should succeed");
  840. registry
  841. .send_prompt(&worker.worker_id, Some("Run the worker bootstrap tests"))
  842. .expect("prompt send should succeed");
  843. let recovered = registry
  844. .observe(
  845. &worker.worker_id,
  846. "/tmp/repo-target-b % Run the worker bootstrap tests\nzsh: command not found: Run",
  847. )
  848. .expect("wrong target should be detected");
  849. assert_eq!(recovered.status, WorkerStatus::ReadyForPrompt);
  850. assert_eq!(
  851. recovered.replay_prompt.as_deref(),
  852. Some("Run the worker bootstrap tests")
  853. );
  854. assert!(recovered
  855. .last_error
  856. .expect("wrong target error should exist")
  857. .message
  858. .contains("wrong target"));
  859. let misdelivery = recovered
  860. .events
  861. .iter()
  862. .find(|event| event.kind == WorkerEventKind::PromptMisdelivery)
  863. .expect("wrong-target event should exist");
  864. assert_eq!(
  865. misdelivery.payload,
  866. Some(WorkerEventPayload::PromptDelivery {
  867. prompt_preview: "Run the worker bootstrap tests".to_string(),
  868. observed_target: WorkerPromptTarget::WrongTarget,
  869. observed_cwd: Some("/tmp/repo-target-b".to_string()),
  870. recovery_armed: false,
  871. })
  872. );
  873. }
  874. #[test]
  875. fn await_ready_surfaces_blocked_or_ready_worker_state() {
  876. let registry = WorkerRegistry::new();
  877. let worker = registry.create("/tmp/repo-d", &[], false);
  878. let initial = registry
  879. .await_ready(&worker.worker_id)
  880. .expect("await should succeed");
  881. assert!(!initial.ready);
  882. assert!(!initial.blocked);
  883. registry
  884. .observe(
  885. &worker.worker_id,
  886. "Do you trust the files in this folder?\n1. Yes, proceed\n2. No",
  887. )
  888. .expect("trust observe should succeed");
  889. let blocked = registry
  890. .await_ready(&worker.worker_id)
  891. .expect("await should succeed");
  892. assert!(!blocked.ready);
  893. assert!(blocked.blocked);
  894. registry
  895. .resolve_trust(&worker.worker_id)
  896. .expect("manual trust resolution should succeed");
  897. registry
  898. .observe(&worker.worker_id, "Ready for your input\n>")
  899. .expect("ready observe should succeed");
  900. let ready = registry
  901. .await_ready(&worker.worker_id)
  902. .expect("await should succeed");
  903. assert!(ready.ready);
  904. assert!(!ready.blocked);
  905. assert!(ready.last_error.is_none());
  906. }
  907. #[test]
  908. fn restart_and_terminate_reset_or_finish_worker() {
  909. let registry = WorkerRegistry::new();
  910. let worker = registry.create("/tmp/repo-e", &[], true);
  911. registry
  912. .observe(&worker.worker_id, "Ready for input\n>")
  913. .expect("ready observe should succeed");
  914. registry
  915. .send_prompt(&worker.worker_id, Some("Run tests"))
  916. .expect("prompt send should succeed");
  917. let restarted = registry
  918. .restart(&worker.worker_id)
  919. .expect("restart should succeed");
  920. assert_eq!(restarted.status, WorkerStatus::Spawning);
  921. assert_eq!(restarted.prompt_delivery_attempts, 0);
  922. assert!(restarted.last_prompt.is_none());
  923. assert!(!restarted.prompt_in_flight);
  924. let finished = registry
  925. .terminate(&worker.worker_id)
  926. .expect("terminate should succeed");
  927. assert_eq!(finished.status, WorkerStatus::Finished);
  928. assert!(finished
  929. .events
  930. .iter()
  931. .any(|event| event.kind == WorkerEventKind::Finished));
  932. }
  933. #[test]
  934. fn observe_completion_classifies_provider_failure_on_unknown_finish_zero_tokens() {
  935. let registry = WorkerRegistry::new();
  936. let worker = registry.create("/tmp/repo-f", &[], true);
  937. registry
  938. .observe(&worker.worker_id, "Ready for input\n>")
  939. .expect("ready observe should succeed");
  940. registry
  941. .send_prompt(&worker.worker_id, Some("Run tests"))
  942. .expect("prompt send should succeed");
  943. let failed = registry
  944. .observe_completion(&worker.worker_id, "unknown", 0)
  945. .expect("completion observe should succeed");
  946. assert_eq!(failed.status, WorkerStatus::Failed);
  947. let error = failed.last_error.expect("provider error should exist");
  948. assert_eq!(error.kind, WorkerFailureKind::Provider);
  949. assert!(error.message.contains("provider degraded"));
  950. assert!(failed
  951. .events
  952. .iter()
  953. .any(|event| event.kind == WorkerEventKind::Failed));
  954. }
  955. #[test]
  956. fn observe_completion_accepts_normal_finish_with_tokens() {
  957. let registry = WorkerRegistry::new();
  958. let worker = registry.create("/tmp/repo-g", &[], true);
  959. registry
  960. .observe(&worker.worker_id, "Ready for input\n>")
  961. .expect("ready observe should succeed");
  962. registry
  963. .send_prompt(&worker.worker_id, Some("Run tests"))
  964. .expect("prompt send should succeed");
  965. let finished = registry
  966. .observe_completion(&worker.worker_id, "stop", 150)
  967. .expect("completion observe should succeed");
  968. assert_eq!(finished.status, WorkerStatus::Finished);
  969. assert!(finished.last_error.is_none());
  970. assert!(finished
  971. .events
  972. .iter()
  973. .any(|event| event.kind == WorkerEventKind::Finished));
  974. }
  975. }