| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- //! In-memory task registry for sub-agent task lifecycle management.
- use std::collections::HashMap;
- use std::sync::{Arc, Mutex};
- use std::time::{SystemTime, UNIX_EPOCH};
- use serde::{Deserialize, Serialize};
- use crate::{validate_packet, TaskPacket, TaskPacketValidationError};
- #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
- #[serde(rename_all = "snake_case")]
- pub enum TaskStatus {
- Created,
- Running,
- Completed,
- Failed,
- Stopped,
- }
- impl std::fmt::Display for TaskStatus {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Self::Created => write!(f, "created"),
- Self::Running => write!(f, "running"),
- Self::Completed => write!(f, "completed"),
- Self::Failed => write!(f, "failed"),
- Self::Stopped => write!(f, "stopped"),
- }
- }
- }
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct Task {
- pub task_id: String,
- pub prompt: String,
- pub description: Option<String>,
- pub task_packet: Option<TaskPacket>,
- pub status: TaskStatus,
- pub created_at: u64,
- pub updated_at: u64,
- pub messages: Vec<TaskMessage>,
- pub output: String,
- pub team_id: Option<String>,
- }
- #[derive(Debug, Clone, Serialize, Deserialize)]
- pub struct TaskMessage {
- pub role: String,
- pub content: String,
- pub timestamp: u64,
- }
- #[derive(Debug, Clone, Default)]
- pub struct TaskRegistry {
- inner: Arc<Mutex<RegistryInner>>,
- }
- #[derive(Debug, Default)]
- struct RegistryInner {
- tasks: HashMap<String, Task>,
- counter: u64,
- }
- fn now_secs() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap_or_default()
- .as_secs()
- }
- impl TaskRegistry {
- #[must_use]
- pub fn new() -> Self {
- Self::default()
- }
- pub fn create(&self, prompt: &str, description: Option<&str>) -> Task {
- self.create_task(
- prompt.to_owned(),
- description.map(str::to_owned),
- None,
- )
- }
- pub fn create_from_packet(
- &self,
- packet: TaskPacket,
- ) -> Result<Task, TaskPacketValidationError> {
- let packet = validate_packet(packet)?.into_inner();
- Ok(self.create_task(
- packet.objective.clone(),
- Some(packet.scope.clone()),
- Some(packet),
- ))
- }
- fn create_task(
- &self,
- prompt: String,
- description: Option<String>,
- task_packet: Option<TaskPacket>,
- ) -> Task {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- inner.counter += 1;
- let ts = now_secs();
- let task_id = format!("task_{:08x}_{}", ts, inner.counter);
- let task = Task {
- task_id: task_id.clone(),
- prompt,
- description,
- task_packet,
- status: TaskStatus::Created,
- created_at: ts,
- updated_at: ts,
- messages: Vec::new(),
- output: String::new(),
- team_id: None,
- };
- inner.tasks.insert(task_id, task.clone());
- task
- }
- pub fn get(&self, task_id: &str) -> Option<Task> {
- let inner = self.inner.lock().expect("registry lock poisoned");
- inner.tasks.get(task_id).cloned()
- }
- pub fn list(&self, status_filter: Option<TaskStatus>) -> Vec<Task> {
- let inner = self.inner.lock().expect("registry lock poisoned");
- inner
- .tasks
- .values()
- .filter(|t| status_filter.map_or(true, |s| t.status == s))
- .cloned()
- .collect()
- }
- pub fn stop(&self, task_id: &str) -> Result<Task, String> {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- let task = inner
- .tasks
- .get_mut(task_id)
- .ok_or_else(|| format!("task not found: {task_id}"))?;
- match task.status {
- TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Stopped => {
- return Err(format!(
- "task {task_id} is already in terminal state: {}",
- task.status
- ));
- }
- _ => {}
- }
- task.status = TaskStatus::Stopped;
- task.updated_at = now_secs();
- Ok(task.clone())
- }
- pub fn update(&self, task_id: &str, message: &str) -> Result<Task, String> {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- let task = inner
- .tasks
- .get_mut(task_id)
- .ok_or_else(|| format!("task not found: {task_id}"))?;
- task.messages.push(TaskMessage {
- role: String::from("user"),
- content: message.to_owned(),
- timestamp: now_secs(),
- });
- task.updated_at = now_secs();
- Ok(task.clone())
- }
- pub fn output(&self, task_id: &str) -> Result<String, String> {
- let inner = self.inner.lock().expect("registry lock poisoned");
- let task = inner
- .tasks
- .get(task_id)
- .ok_or_else(|| format!("task not found: {task_id}"))?;
- Ok(task.output.clone())
- }
- pub fn append_output(&self, task_id: &str, output: &str) -> Result<(), String> {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- let task = inner
- .tasks
- .get_mut(task_id)
- .ok_or_else(|| format!("task not found: {task_id}"))?;
- task.output.push_str(output);
- task.updated_at = now_secs();
- Ok(())
- }
- pub fn set_status(&self, task_id: &str, status: TaskStatus) -> Result<(), String> {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- let task = inner
- .tasks
- .get_mut(task_id)
- .ok_or_else(|| format!("task not found: {task_id}"))?;
- task.status = status;
- task.updated_at = now_secs();
- Ok(())
- }
- pub fn assign_team(&self, task_id: &str, team_id: &str) -> Result<(), String> {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- let task = inner
- .tasks
- .get_mut(task_id)
- .ok_or_else(|| format!("task not found: {task_id}"))?;
- task.team_id = Some(team_id.to_owned());
- task.updated_at = now_secs();
- Ok(())
- }
- pub fn remove(&self, task_id: &str) -> Option<Task> {
- let mut inner = self.inner.lock().expect("registry lock poisoned");
- inner.tasks.remove(task_id)
- }
- #[must_use]
- pub fn len(&self) -> usize {
- let inner = self.inner.lock().expect("registry lock poisoned");
- inner.tasks.len()
- }
- #[must_use]
- pub fn is_empty(&self) -> bool {
- self.len() == 0
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- #[test]
- fn creates_and_retrieves_tasks() {
- let registry = TaskRegistry::new();
- let task = registry.create("Do something", Some("A test task"));
- assert_eq!(task.status, TaskStatus::Created);
- assert_eq!(task.prompt, "Do something");
- assert_eq!(task.description.as_deref(), Some("A test task"));
- assert_eq!(task.task_packet, None);
- let fetched = registry.get(&task.task_id).expect("task should exist");
- assert_eq!(fetched.task_id, task.task_id);
- }
- #[test]
- fn creates_task_from_packet() {
- let registry = TaskRegistry::new();
- let packet = TaskPacket {
- objective: "Ship task packet support".to_string(),
- scope: "runtime/task system".to_string(),
- repo: "claw-code-parity".to_string(),
- branch_policy: "origin/main only".to_string(),
- acceptance_tests: vec!["cargo test --workspace".to_string()],
- commit_policy: "single commit".to_string(),
- reporting_contract: "print commit sha".to_string(),
- escalation_policy: "manual escalation".to_string(),
- };
- let task = registry
- .create_from_packet(packet.clone())
- .expect("packet-backed task should be created");
- assert_eq!(task.prompt, packet.objective);
- assert_eq!(task.description.as_deref(), Some("runtime/task system"));
- assert_eq!(task.task_packet, Some(packet.clone()));
- let fetched = registry.get(&task.task_id).expect("task should exist");
- assert_eq!(fetched.task_packet, Some(packet));
- }
- #[test]
- fn lists_tasks_with_optional_filter() {
- let registry = TaskRegistry::new();
- registry.create("Task A", None);
- let task_b = registry.create("Task B", None);
- registry
- .set_status(&task_b.task_id, TaskStatus::Running)
- .expect("set status should succeed");
- let all = registry.list(None);
- assert_eq!(all.len(), 2);
- let running = registry.list(Some(TaskStatus::Running));
- assert_eq!(running.len(), 1);
- assert_eq!(running[0].task_id, task_b.task_id);
- let created = registry.list(Some(TaskStatus::Created));
- assert_eq!(created.len(), 1);
- }
- #[test]
- fn stops_running_task() {
- let registry = TaskRegistry::new();
- let task = registry.create("Stoppable", None);
- registry
- .set_status(&task.task_id, TaskStatus::Running)
- .unwrap();
- let stopped = registry.stop(&task.task_id).expect("stop should succeed");
- assert_eq!(stopped.status, TaskStatus::Stopped);
- // Stopping again should fail
- let result = registry.stop(&task.task_id);
- assert!(result.is_err());
- }
- #[test]
- fn updates_task_with_messages() {
- let registry = TaskRegistry::new();
- let task = registry.create("Messageable", None);
- let updated = registry
- .update(&task.task_id, "Here's more context")
- .expect("update should succeed");
- assert_eq!(updated.messages.len(), 1);
- assert_eq!(updated.messages[0].content, "Here's more context");
- assert_eq!(updated.messages[0].role, "user");
- }
- #[test]
- fn appends_and_retrieves_output() {
- let registry = TaskRegistry::new();
- let task = registry.create("Output task", None);
- registry
- .append_output(&task.task_id, "line 1\n")
- .expect("append should succeed");
- registry
- .append_output(&task.task_id, "line 2\n")
- .expect("append should succeed");
- let output = registry.output(&task.task_id).expect("output should exist");
- assert_eq!(output, "line 1\nline 2\n");
- }
- #[test]
- fn assigns_team_and_removes_task() {
- let registry = TaskRegistry::new();
- let task = registry.create("Team task", None);
- registry
- .assign_team(&task.task_id, "team_abc")
- .expect("assign should succeed");
- let fetched = registry.get(&task.task_id).unwrap();
- assert_eq!(fetched.team_id.as_deref(), Some("team_abc"));
- let removed = registry.remove(&task.task_id);
- assert!(removed.is_some());
- assert!(registry.get(&task.task_id).is_none());
- assert!(registry.is_empty());
- }
- #[test]
- fn rejects_operations_on_missing_task() {
- let registry = TaskRegistry::new();
- assert!(registry.stop("nonexistent").is_err());
- assert!(registry.update("nonexistent", "msg").is_err());
- assert!(registry.output("nonexistent").is_err());
- assert!(registry.append_output("nonexistent", "data").is_err());
- assert!(registry
- .set_status("nonexistent", TaskStatus::Running)
- .is_err());
- }
- #[test]
- fn task_status_display_all_variants() {
- // given
- let cases = [
- (TaskStatus::Created, "created"),
- (TaskStatus::Running, "running"),
- (TaskStatus::Completed, "completed"),
- (TaskStatus::Failed, "failed"),
- (TaskStatus::Stopped, "stopped"),
- ];
- // when
- let rendered: Vec<_> = cases
- .into_iter()
- .map(|(status, expected)| (status.to_string(), expected))
- .collect();
- // then
- assert_eq!(
- rendered,
- vec![
- ("created".to_string(), "created"),
- ("running".to_string(), "running"),
- ("completed".to_string(), "completed"),
- ("failed".to_string(), "failed"),
- ("stopped".to_string(), "stopped"),
- ]
- );
- }
- #[test]
- fn stop_rejects_completed_task() {
- // given
- let registry = TaskRegistry::new();
- let task = registry.create("done", None);
- registry
- .set_status(&task.task_id, TaskStatus::Completed)
- .expect("set status should succeed");
- // when
- let result = registry.stop(&task.task_id);
- // then
- let error = result.expect_err("completed task should be rejected");
- assert!(error.contains("already in terminal state"));
- assert!(error.contains("completed"));
- }
- #[test]
- fn stop_rejects_failed_task() {
- // given
- let registry = TaskRegistry::new();
- let task = registry.create("failed", None);
- registry
- .set_status(&task.task_id, TaskStatus::Failed)
- .expect("set status should succeed");
- // when
- let result = registry.stop(&task.task_id);
- // then
- let error = result.expect_err("failed task should be rejected");
- assert!(error.contains("already in terminal state"));
- assert!(error.contains("failed"));
- }
- #[test]
- fn stop_succeeds_from_created_state() {
- // given
- let registry = TaskRegistry::new();
- let task = registry.create("created task", None);
- // when
- let stopped = registry.stop(&task.task_id).expect("stop should succeed");
- // then
- assert_eq!(stopped.status, TaskStatus::Stopped);
- assert!(stopped.updated_at >= task.updated_at);
- }
- #[test]
- fn new_registry_is_empty() {
- // given
- let registry = TaskRegistry::new();
- // when
- let all_tasks = registry.list(None);
- // then
- assert!(registry.is_empty());
- assert_eq!(registry.len(), 0);
- assert!(all_tasks.is_empty());
- }
- #[test]
- fn create_without_description() {
- // given
- let registry = TaskRegistry::new();
- // when
- let task = registry.create("Do the thing", None);
- // then
- assert!(task.task_id.starts_with("task_"));
- assert_eq!(task.description, None);
- assert_eq!(task.task_packet, None);
- assert!(task.messages.is_empty());
- assert!(task.output.is_empty());
- assert_eq!(task.team_id, None);
- }
- #[test]
- fn remove_nonexistent_returns_none() {
- // given
- let registry = TaskRegistry::new();
- // when
- let removed = registry.remove("missing");
- // then
- assert!(removed.is_none());
- }
- #[test]
- fn assign_team_rejects_missing_task() {
- // given
- let registry = TaskRegistry::new();
- // when
- let result = registry.assign_team("missing", "team_123");
- // then
- let error = result.expect_err("missing task should be rejected");
- assert_eq!(error, "task not found: missing");
- }
- }
|