session.rs 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109
  1. use std::collections::BTreeMap;
  2. use std::fmt::{Display, Formatter};
  3. use std::fs::{self, OpenOptions};
  4. use std::io::Write;
  5. use std::path::{Path, PathBuf};
  6. use std::sync::atomic::{AtomicU64, Ordering};
  7. use std::time::{SystemTime, UNIX_EPOCH};
  8. use crate::json::{JsonError, JsonValue};
  9. use crate::usage::TokenUsage;
  10. const SESSION_VERSION: u32 = 1;
  11. const ROTATE_AFTER_BYTES: u64 = 256 * 1024;
  12. const MAX_ROTATED_FILES: usize = 3;
  13. static SESSION_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
  14. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
  15. pub enum MessageRole {
  16. System,
  17. User,
  18. Assistant,
  19. Tool,
  20. }
  21. #[derive(Debug, Clone, PartialEq, Eq)]
  22. pub enum ContentBlock {
  23. Text {
  24. text: String,
  25. },
  26. ToolUse {
  27. id: String,
  28. name: String,
  29. input: String,
  30. },
  31. ToolResult {
  32. tool_use_id: String,
  33. tool_name: String,
  34. output: String,
  35. is_error: bool,
  36. },
  37. }
  38. #[derive(Debug, Clone, PartialEq, Eq)]
  39. pub struct ConversationMessage {
  40. pub role: MessageRole,
  41. pub blocks: Vec<ContentBlock>,
  42. pub usage: Option<TokenUsage>,
  43. }
  44. #[derive(Debug, Clone, PartialEq, Eq)]
  45. pub struct SessionCompaction {
  46. pub count: u32,
  47. pub removed_message_count: usize,
  48. pub summary: String,
  49. }
  50. #[derive(Debug, Clone, PartialEq, Eq)]
  51. pub struct SessionFork {
  52. pub parent_session_id: String,
  53. pub branch_name: Option<String>,
  54. }
  55. #[derive(Debug, Clone, PartialEq, Eq)]
  56. struct SessionPersistence {
  57. path: PathBuf,
  58. }
  59. #[derive(Debug, Clone)]
  60. pub struct Session {
  61. pub version: u32,
  62. pub session_id: String,
  63. pub created_at_ms: u64,
  64. pub updated_at_ms: u64,
  65. pub messages: Vec<ConversationMessage>,
  66. pub compaction: Option<SessionCompaction>,
  67. pub fork: Option<SessionFork>,
  68. persistence: Option<SessionPersistence>,
  69. }
  70. impl PartialEq for Session {
  71. fn eq(&self, other: &Self) -> bool {
  72. self.version == other.version
  73. && self.session_id == other.session_id
  74. && self.created_at_ms == other.created_at_ms
  75. && self.updated_at_ms == other.updated_at_ms
  76. && self.messages == other.messages
  77. && self.compaction == other.compaction
  78. && self.fork == other.fork
  79. }
  80. }
  81. impl Eq for Session {}
  82. #[derive(Debug)]
  83. pub enum SessionError {
  84. Io(std::io::Error),
  85. Json(JsonError),
  86. Format(String),
  87. }
  88. impl Display for SessionError {
  89. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  90. match self {
  91. Self::Io(error) => write!(f, "{error}"),
  92. Self::Json(error) => write!(f, "{error}"),
  93. Self::Format(error) => write!(f, "{error}"),
  94. }
  95. }
  96. }
  97. impl std::error::Error for SessionError {}
  98. impl From<std::io::Error> for SessionError {
  99. fn from(value: std::io::Error) -> Self {
  100. Self::Io(value)
  101. }
  102. }
  103. impl From<JsonError> for SessionError {
  104. fn from(value: JsonError) -> Self {
  105. Self::Json(value)
  106. }
  107. }
  108. impl Session {
  109. #[must_use]
  110. pub fn new() -> Self {
  111. let now = current_time_millis();
  112. Self {
  113. version: SESSION_VERSION,
  114. session_id: generate_session_id(),
  115. created_at_ms: now,
  116. updated_at_ms: now,
  117. messages: Vec::new(),
  118. compaction: None,
  119. fork: None,
  120. persistence: None,
  121. }
  122. }
  123. #[must_use]
  124. pub fn with_persistence_path(mut self, path: impl Into<PathBuf>) -> Self {
  125. self.persistence = Some(SessionPersistence { path: path.into() });
  126. self
  127. }
  128. #[must_use]
  129. pub fn persistence_path(&self) -> Option<&Path> {
  130. self.persistence.as_ref().map(|value| value.path.as_path())
  131. }
  132. pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<(), SessionError> {
  133. let path = path.as_ref();
  134. rotate_session_file_if_needed(path)?;
  135. write_atomic(path, &self.render_jsonl_snapshot())?;
  136. cleanup_rotated_logs(path)?;
  137. Ok(())
  138. }
  139. pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self, SessionError> {
  140. let path = path.as_ref();
  141. let contents = fs::read_to_string(path)?;
  142. let session = match JsonValue::parse(&contents) {
  143. Ok(value)
  144. if value
  145. .as_object()
  146. .is_some_and(|object| object.contains_key("messages")) =>
  147. {
  148. Self::from_json(&value)?
  149. }
  150. Err(_) => Self::from_jsonl(&contents)?,
  151. Ok(_) => Self::from_jsonl(&contents)?,
  152. };
  153. Ok(session.with_persistence_path(path.to_path_buf()))
  154. }
  155. pub fn push_message(&mut self, message: ConversationMessage) -> Result<(), SessionError> {
  156. self.touch();
  157. self.messages.push(message.clone());
  158. self.append_persisted_message(&message)
  159. }
  160. pub fn push_user_text(&mut self, text: impl Into<String>) -> Result<(), SessionError> {
  161. self.push_message(ConversationMessage::user_text(text))
  162. }
  163. pub fn record_compaction(&mut self, summary: impl Into<String>, removed_message_count: usize) {
  164. self.touch();
  165. let count = self.compaction.as_ref().map_or(1, |value| value.count + 1);
  166. self.compaction = Some(SessionCompaction {
  167. count,
  168. removed_message_count,
  169. summary: summary.into(),
  170. });
  171. }
  172. #[must_use]
  173. pub fn fork(&self, branch_name: Option<String>) -> Self {
  174. let now = current_time_millis();
  175. Self {
  176. version: self.version,
  177. session_id: generate_session_id(),
  178. created_at_ms: now,
  179. updated_at_ms: now,
  180. messages: self.messages.clone(),
  181. compaction: self.compaction.clone(),
  182. fork: Some(SessionFork {
  183. parent_session_id: self.session_id.clone(),
  184. branch_name: normalize_optional_string(branch_name),
  185. }),
  186. persistence: None,
  187. }
  188. }
  189. #[must_use]
  190. pub fn to_json(&self) -> JsonValue {
  191. let mut object = BTreeMap::new();
  192. object.insert(
  193. "version".to_string(),
  194. JsonValue::Number(i64::from(self.version)),
  195. );
  196. object.insert(
  197. "session_id".to_string(),
  198. JsonValue::String(self.session_id.clone()),
  199. );
  200. object.insert(
  201. "created_at_ms".to_string(),
  202. JsonValue::Number(i64_from_u64(self.created_at_ms, "created_at_ms")),
  203. );
  204. object.insert(
  205. "updated_at_ms".to_string(),
  206. JsonValue::Number(i64_from_u64(self.updated_at_ms, "updated_at_ms")),
  207. );
  208. object.insert(
  209. "messages".to_string(),
  210. JsonValue::Array(
  211. self.messages
  212. .iter()
  213. .map(ConversationMessage::to_json)
  214. .collect(),
  215. ),
  216. );
  217. if let Some(compaction) = &self.compaction {
  218. object.insert("compaction".to_string(), compaction.to_json());
  219. }
  220. if let Some(fork) = &self.fork {
  221. object.insert("fork".to_string(), fork.to_json());
  222. }
  223. JsonValue::Object(object)
  224. }
  225. pub fn from_json(value: &JsonValue) -> Result<Self, SessionError> {
  226. let object = value
  227. .as_object()
  228. .ok_or_else(|| SessionError::Format("session must be an object".to_string()))?;
  229. let version = object
  230. .get("version")
  231. .and_then(JsonValue::as_i64)
  232. .ok_or_else(|| SessionError::Format("missing version".to_string()))?;
  233. let version = u32::try_from(version)
  234. .map_err(|_| SessionError::Format("version out of range".to_string()))?;
  235. let messages = object
  236. .get("messages")
  237. .and_then(JsonValue::as_array)
  238. .ok_or_else(|| SessionError::Format("missing messages".to_string()))?
  239. .iter()
  240. .map(ConversationMessage::from_json)
  241. .collect::<Result<Vec<_>, _>>()?;
  242. let now = current_time_millis();
  243. let session_id = object
  244. .get("session_id")
  245. .and_then(JsonValue::as_str)
  246. .map(ToOwned::to_owned)
  247. .unwrap_or_else(generate_session_id);
  248. let created_at_ms = object
  249. .get("created_at_ms")
  250. .map(|value| required_u64_from_value(value, "created_at_ms"))
  251. .transpose()?
  252. .unwrap_or(now);
  253. let updated_at_ms = object
  254. .get("updated_at_ms")
  255. .map(|value| required_u64_from_value(value, "updated_at_ms"))
  256. .transpose()?
  257. .unwrap_or(created_at_ms);
  258. let compaction = object
  259. .get("compaction")
  260. .map(SessionCompaction::from_json)
  261. .transpose()?;
  262. let fork = object.get("fork").map(SessionFork::from_json).transpose()?;
  263. Ok(Self {
  264. version,
  265. session_id,
  266. created_at_ms,
  267. updated_at_ms,
  268. messages,
  269. compaction,
  270. fork,
  271. persistence: None,
  272. })
  273. }
  274. fn from_jsonl(contents: &str) -> Result<Self, SessionError> {
  275. let mut version = SESSION_VERSION;
  276. let mut session_id = None;
  277. let mut created_at_ms = None;
  278. let mut updated_at_ms = None;
  279. let mut messages = Vec::new();
  280. let mut compaction = None;
  281. let mut fork = None;
  282. for (line_number, raw_line) in contents.lines().enumerate() {
  283. let line = raw_line.trim();
  284. if line.is_empty() {
  285. continue;
  286. }
  287. let value = JsonValue::parse(line).map_err(|error| {
  288. SessionError::Format(format!(
  289. "invalid JSONL record at line {}: {}",
  290. line_number + 1,
  291. error
  292. ))
  293. })?;
  294. let object = value.as_object().ok_or_else(|| {
  295. SessionError::Format(format!(
  296. "JSONL record at line {} must be an object",
  297. line_number + 1
  298. ))
  299. })?;
  300. match object
  301. .get("type")
  302. .and_then(JsonValue::as_str)
  303. .ok_or_else(|| {
  304. SessionError::Format(format!(
  305. "JSONL record at line {} missing type",
  306. line_number + 1
  307. ))
  308. })? {
  309. "session_meta" => {
  310. version = required_u32(object, "version")?;
  311. session_id = Some(required_string(object, "session_id")?);
  312. created_at_ms = Some(required_u64(object, "created_at_ms")?);
  313. updated_at_ms = Some(required_u64(object, "updated_at_ms")?);
  314. fork = object.get("fork").map(SessionFork::from_json).transpose()?;
  315. }
  316. "message" => {
  317. let message_value = object.get("message").ok_or_else(|| {
  318. SessionError::Format(format!(
  319. "JSONL record at line {} missing message",
  320. line_number + 1
  321. ))
  322. })?;
  323. messages.push(ConversationMessage::from_json(message_value)?);
  324. }
  325. "compaction" => {
  326. compaction = Some(SessionCompaction::from_json(&JsonValue::Object(
  327. object.clone(),
  328. ))?);
  329. }
  330. other => {
  331. return Err(SessionError::Format(format!(
  332. "unsupported JSONL record type at line {}: {other}",
  333. line_number + 1
  334. )))
  335. }
  336. }
  337. }
  338. let now = current_time_millis();
  339. Ok(Self {
  340. version,
  341. session_id: session_id.unwrap_or_else(generate_session_id),
  342. created_at_ms: created_at_ms.unwrap_or(now),
  343. updated_at_ms: updated_at_ms.unwrap_or(created_at_ms.unwrap_or(now)),
  344. messages,
  345. compaction,
  346. fork,
  347. persistence: None,
  348. })
  349. }
  350. fn render_jsonl_snapshot(&self) -> String {
  351. let mut lines = vec![self.meta_record().render()];
  352. if let Some(compaction) = &self.compaction {
  353. lines.push(compaction.to_jsonl_record().render());
  354. }
  355. lines.extend(
  356. self.messages
  357. .iter()
  358. .map(|message| message_record(message).render()),
  359. );
  360. let mut rendered = lines.join("\n");
  361. rendered.push('\n');
  362. rendered
  363. }
  364. fn append_persisted_message(&self, message: &ConversationMessage) -> Result<(), SessionError> {
  365. let Some(path) = self.persistence_path() else {
  366. return Ok(());
  367. };
  368. let needs_bootstrap = !path.exists() || fs::metadata(path)?.len() == 0;
  369. if needs_bootstrap {
  370. self.save_to_path(path)?;
  371. return Ok(());
  372. }
  373. let mut file = OpenOptions::new().append(true).open(path)?;
  374. writeln!(file, "{}", message_record(message).render())?;
  375. Ok(())
  376. }
  377. fn meta_record(&self) -> JsonValue {
  378. let mut object = BTreeMap::new();
  379. object.insert(
  380. "type".to_string(),
  381. JsonValue::String("session_meta".to_string()),
  382. );
  383. object.insert(
  384. "version".to_string(),
  385. JsonValue::Number(i64::from(self.version)),
  386. );
  387. object.insert(
  388. "session_id".to_string(),
  389. JsonValue::String(self.session_id.clone()),
  390. );
  391. object.insert(
  392. "created_at_ms".to_string(),
  393. JsonValue::Number(i64_from_u64(self.created_at_ms, "created_at_ms")),
  394. );
  395. object.insert(
  396. "updated_at_ms".to_string(),
  397. JsonValue::Number(i64_from_u64(self.updated_at_ms, "updated_at_ms")),
  398. );
  399. if let Some(fork) = &self.fork {
  400. object.insert("fork".to_string(), fork.to_json());
  401. }
  402. JsonValue::Object(object)
  403. }
  404. fn touch(&mut self) {
  405. self.updated_at_ms = current_time_millis();
  406. }
  407. }
  408. impl Default for Session {
  409. fn default() -> Self {
  410. Self::new()
  411. }
  412. }
  413. impl ConversationMessage {
  414. #[must_use]
  415. pub fn user_text(text: impl Into<String>) -> Self {
  416. Self {
  417. role: MessageRole::User,
  418. blocks: vec![ContentBlock::Text { text: text.into() }],
  419. usage: None,
  420. }
  421. }
  422. #[must_use]
  423. pub fn assistant(blocks: Vec<ContentBlock>) -> Self {
  424. Self {
  425. role: MessageRole::Assistant,
  426. blocks,
  427. usage: None,
  428. }
  429. }
  430. #[must_use]
  431. pub fn assistant_with_usage(blocks: Vec<ContentBlock>, usage: Option<TokenUsage>) -> Self {
  432. Self {
  433. role: MessageRole::Assistant,
  434. blocks,
  435. usage,
  436. }
  437. }
  438. #[must_use]
  439. pub fn tool_result(
  440. tool_use_id: impl Into<String>,
  441. tool_name: impl Into<String>,
  442. output: impl Into<String>,
  443. is_error: bool,
  444. ) -> Self {
  445. Self {
  446. role: MessageRole::Tool,
  447. blocks: vec![ContentBlock::ToolResult {
  448. tool_use_id: tool_use_id.into(),
  449. tool_name: tool_name.into(),
  450. output: output.into(),
  451. is_error,
  452. }],
  453. usage: None,
  454. }
  455. }
  456. #[must_use]
  457. pub fn to_json(&self) -> JsonValue {
  458. let mut object = BTreeMap::new();
  459. object.insert(
  460. "role".to_string(),
  461. JsonValue::String(
  462. match self.role {
  463. MessageRole::System => "system",
  464. MessageRole::User => "user",
  465. MessageRole::Assistant => "assistant",
  466. MessageRole::Tool => "tool",
  467. }
  468. .to_string(),
  469. ),
  470. );
  471. object.insert(
  472. "blocks".to_string(),
  473. JsonValue::Array(self.blocks.iter().map(ContentBlock::to_json).collect()),
  474. );
  475. if let Some(usage) = self.usage {
  476. object.insert("usage".to_string(), usage_to_json(usage));
  477. }
  478. JsonValue::Object(object)
  479. }
  480. fn from_json(value: &JsonValue) -> Result<Self, SessionError> {
  481. let object = value
  482. .as_object()
  483. .ok_or_else(|| SessionError::Format("message must be an object".to_string()))?;
  484. let role = match object
  485. .get("role")
  486. .and_then(JsonValue::as_str)
  487. .ok_or_else(|| SessionError::Format("missing role".to_string()))?
  488. {
  489. "system" => MessageRole::System,
  490. "user" => MessageRole::User,
  491. "assistant" => MessageRole::Assistant,
  492. "tool" => MessageRole::Tool,
  493. other => {
  494. return Err(SessionError::Format(format!(
  495. "unsupported message role: {other}"
  496. )))
  497. }
  498. };
  499. let blocks = object
  500. .get("blocks")
  501. .and_then(JsonValue::as_array)
  502. .ok_or_else(|| SessionError::Format("missing blocks".to_string()))?
  503. .iter()
  504. .map(ContentBlock::from_json)
  505. .collect::<Result<Vec<_>, _>>()?;
  506. let usage = object.get("usage").map(usage_from_json).transpose()?;
  507. Ok(Self {
  508. role,
  509. blocks,
  510. usage,
  511. })
  512. }
  513. }
  514. impl ContentBlock {
  515. #[must_use]
  516. pub fn to_json(&self) -> JsonValue {
  517. let mut object = BTreeMap::new();
  518. match self {
  519. Self::Text { text } => {
  520. object.insert("type".to_string(), JsonValue::String("text".to_string()));
  521. object.insert("text".to_string(), JsonValue::String(text.clone()));
  522. }
  523. Self::ToolUse { id, name, input } => {
  524. object.insert(
  525. "type".to_string(),
  526. JsonValue::String("tool_use".to_string()),
  527. );
  528. object.insert("id".to_string(), JsonValue::String(id.clone()));
  529. object.insert("name".to_string(), JsonValue::String(name.clone()));
  530. object.insert("input".to_string(), JsonValue::String(input.clone()));
  531. }
  532. Self::ToolResult {
  533. tool_use_id,
  534. tool_name,
  535. output,
  536. is_error,
  537. } => {
  538. object.insert(
  539. "type".to_string(),
  540. JsonValue::String("tool_result".to_string()),
  541. );
  542. object.insert(
  543. "tool_use_id".to_string(),
  544. JsonValue::String(tool_use_id.clone()),
  545. );
  546. object.insert(
  547. "tool_name".to_string(),
  548. JsonValue::String(tool_name.clone()),
  549. );
  550. object.insert("output".to_string(), JsonValue::String(output.clone()));
  551. object.insert("is_error".to_string(), JsonValue::Bool(*is_error));
  552. }
  553. }
  554. JsonValue::Object(object)
  555. }
  556. fn from_json(value: &JsonValue) -> Result<Self, SessionError> {
  557. let object = value
  558. .as_object()
  559. .ok_or_else(|| SessionError::Format("block must be an object".to_string()))?;
  560. match object
  561. .get("type")
  562. .and_then(JsonValue::as_str)
  563. .ok_or_else(|| SessionError::Format("missing block type".to_string()))?
  564. {
  565. "text" => Ok(Self::Text {
  566. text: required_string(object, "text")?,
  567. }),
  568. "tool_use" => Ok(Self::ToolUse {
  569. id: required_string(object, "id")?,
  570. name: required_string(object, "name")?,
  571. input: required_string(object, "input")?,
  572. }),
  573. "tool_result" => Ok(Self::ToolResult {
  574. tool_use_id: required_string(object, "tool_use_id")?,
  575. tool_name: required_string(object, "tool_name")?,
  576. output: required_string(object, "output")?,
  577. is_error: object
  578. .get("is_error")
  579. .and_then(JsonValue::as_bool)
  580. .ok_or_else(|| SessionError::Format("missing is_error".to_string()))?,
  581. }),
  582. other => Err(SessionError::Format(format!(
  583. "unsupported block type: {other}"
  584. ))),
  585. }
  586. }
  587. }
  588. impl SessionCompaction {
  589. #[must_use]
  590. pub fn to_json(&self) -> JsonValue {
  591. let mut object = BTreeMap::new();
  592. object.insert(
  593. "count".to_string(),
  594. JsonValue::Number(i64::from(self.count)),
  595. );
  596. object.insert(
  597. "removed_message_count".to_string(),
  598. JsonValue::Number(i64_from_usize(
  599. self.removed_message_count,
  600. "removed_message_count",
  601. )),
  602. );
  603. object.insert(
  604. "summary".to_string(),
  605. JsonValue::String(self.summary.clone()),
  606. );
  607. JsonValue::Object(object)
  608. }
  609. #[must_use]
  610. pub fn to_jsonl_record(&self) -> JsonValue {
  611. let mut object = self
  612. .to_json()
  613. .as_object()
  614. .cloned()
  615. .expect("compaction should render to object");
  616. object.insert(
  617. "type".to_string(),
  618. JsonValue::String("compaction".to_string()),
  619. );
  620. JsonValue::Object(object)
  621. }
  622. fn from_json(value: &JsonValue) -> Result<Self, SessionError> {
  623. let object = value
  624. .as_object()
  625. .ok_or_else(|| SessionError::Format("compaction must be an object".to_string()))?;
  626. Ok(Self {
  627. count: required_u32(object, "count")?,
  628. removed_message_count: required_usize(object, "removed_message_count")?,
  629. summary: required_string(object, "summary")?,
  630. })
  631. }
  632. }
  633. impl SessionFork {
  634. #[must_use]
  635. pub fn to_json(&self) -> JsonValue {
  636. let mut object = BTreeMap::new();
  637. object.insert(
  638. "parent_session_id".to_string(),
  639. JsonValue::String(self.parent_session_id.clone()),
  640. );
  641. if let Some(branch_name) = &self.branch_name {
  642. object.insert(
  643. "branch_name".to_string(),
  644. JsonValue::String(branch_name.clone()),
  645. );
  646. }
  647. JsonValue::Object(object)
  648. }
  649. fn from_json(value: &JsonValue) -> Result<Self, SessionError> {
  650. let object = value
  651. .as_object()
  652. .ok_or_else(|| SessionError::Format("fork metadata must be an object".to_string()))?;
  653. Ok(Self {
  654. parent_session_id: required_string(object, "parent_session_id")?,
  655. branch_name: object
  656. .get("branch_name")
  657. .and_then(JsonValue::as_str)
  658. .map(ToOwned::to_owned),
  659. })
  660. }
  661. }
  662. fn message_record(message: &ConversationMessage) -> JsonValue {
  663. let mut object = BTreeMap::new();
  664. object.insert("type".to_string(), JsonValue::String("message".to_string()));
  665. object.insert("message".to_string(), message.to_json());
  666. JsonValue::Object(object)
  667. }
  668. fn usage_to_json(usage: TokenUsage) -> JsonValue {
  669. let mut object = BTreeMap::new();
  670. object.insert(
  671. "input_tokens".to_string(),
  672. JsonValue::Number(i64::from(usage.input_tokens)),
  673. );
  674. object.insert(
  675. "output_tokens".to_string(),
  676. JsonValue::Number(i64::from(usage.output_tokens)),
  677. );
  678. object.insert(
  679. "cache_creation_input_tokens".to_string(),
  680. JsonValue::Number(i64::from(usage.cache_creation_input_tokens)),
  681. );
  682. object.insert(
  683. "cache_read_input_tokens".to_string(),
  684. JsonValue::Number(i64::from(usage.cache_read_input_tokens)),
  685. );
  686. JsonValue::Object(object)
  687. }
  688. fn usage_from_json(value: &JsonValue) -> Result<TokenUsage, SessionError> {
  689. let object = value
  690. .as_object()
  691. .ok_or_else(|| SessionError::Format("usage must be an object".to_string()))?;
  692. Ok(TokenUsage {
  693. input_tokens: required_u32(object, "input_tokens")?,
  694. output_tokens: required_u32(object, "output_tokens")?,
  695. cache_creation_input_tokens: required_u32(object, "cache_creation_input_tokens")?,
  696. cache_read_input_tokens: required_u32(object, "cache_read_input_tokens")?,
  697. })
  698. }
  699. fn required_string(
  700. object: &BTreeMap<String, JsonValue>,
  701. key: &str,
  702. ) -> Result<String, SessionError> {
  703. object
  704. .get(key)
  705. .and_then(JsonValue::as_str)
  706. .map(ToOwned::to_owned)
  707. .ok_or_else(|| SessionError::Format(format!("missing {key}")))
  708. }
  709. fn required_u32(object: &BTreeMap<String, JsonValue>, key: &str) -> Result<u32, SessionError> {
  710. let value = object
  711. .get(key)
  712. .and_then(JsonValue::as_i64)
  713. .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
  714. u32::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
  715. }
  716. fn required_u64(object: &BTreeMap<String, JsonValue>, key: &str) -> Result<u64, SessionError> {
  717. let value = object
  718. .get(key)
  719. .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
  720. required_u64_from_value(value, key)
  721. }
  722. fn required_u64_from_value(value: &JsonValue, key: &str) -> Result<u64, SessionError> {
  723. let value = value
  724. .as_i64()
  725. .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
  726. u64::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
  727. }
  728. fn required_usize(object: &BTreeMap<String, JsonValue>, key: &str) -> Result<usize, SessionError> {
  729. let value = object
  730. .get(key)
  731. .and_then(JsonValue::as_i64)
  732. .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
  733. usize::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
  734. }
  735. fn i64_from_u64(value: u64, key: &str) -> i64 {
  736. i64::try_from(value).unwrap_or_else(|_| panic!("{key} out of range for JSON number"))
  737. }
  738. fn i64_from_usize(value: usize, key: &str) -> i64 {
  739. i64::try_from(value).unwrap_or_else(|_| panic!("{key} out of range for JSON number"))
  740. }
  741. fn normalize_optional_string(value: Option<String>) -> Option<String> {
  742. value.and_then(|value| {
  743. let trimmed = value.trim();
  744. if trimmed.is_empty() {
  745. None
  746. } else {
  747. Some(trimmed.to_string())
  748. }
  749. })
  750. }
  751. fn current_time_millis() -> u64 {
  752. SystemTime::now()
  753. .duration_since(UNIX_EPOCH)
  754. .map(|duration| duration.as_millis() as u64)
  755. .unwrap_or_default()
  756. }
  757. fn generate_session_id() -> String {
  758. let millis = current_time_millis();
  759. let counter = SESSION_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
  760. format!("session-{millis}-{counter}")
  761. }
  762. fn write_atomic(path: &Path, contents: &str) -> Result<(), SessionError> {
  763. if let Some(parent) = path.parent() {
  764. fs::create_dir_all(parent)?;
  765. }
  766. let temp_path = temporary_path_for(path);
  767. fs::write(&temp_path, contents)?;
  768. fs::rename(temp_path, path)?;
  769. Ok(())
  770. }
  771. fn temporary_path_for(path: &Path) -> PathBuf {
  772. let file_name = path
  773. .file_name()
  774. .and_then(|value| value.to_str())
  775. .unwrap_or("session");
  776. path.with_file_name(format!(
  777. "{file_name}.tmp-{}-{}",
  778. current_time_millis(),
  779. SESSION_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
  780. ))
  781. }
  782. fn rotate_session_file_if_needed(path: &Path) -> Result<(), SessionError> {
  783. let Ok(metadata) = fs::metadata(path) else {
  784. return Ok(());
  785. };
  786. if metadata.len() < ROTATE_AFTER_BYTES {
  787. return Ok(());
  788. }
  789. let rotated_path = rotated_log_path(path);
  790. fs::rename(path, rotated_path)?;
  791. Ok(())
  792. }
  793. fn rotated_log_path(path: &Path) -> PathBuf {
  794. let stem = path
  795. .file_stem()
  796. .and_then(|value| value.to_str())
  797. .unwrap_or("session");
  798. path.with_file_name(format!("{stem}.rot-{}.jsonl", current_time_millis()))
  799. }
  800. fn cleanup_rotated_logs(path: &Path) -> Result<(), SessionError> {
  801. let Some(parent) = path.parent() else {
  802. return Ok(());
  803. };
  804. let stem = path
  805. .file_stem()
  806. .and_then(|value| value.to_str())
  807. .unwrap_or("session");
  808. let prefix = format!("{stem}.rot-");
  809. let mut rotated_paths = fs::read_dir(parent)?
  810. .filter_map(Result::ok)
  811. .map(|entry| entry.path())
  812. .filter(|entry_path| {
  813. entry_path
  814. .file_name()
  815. .and_then(|value| value.to_str())
  816. .is_some_and(|name| name.starts_with(&prefix) && name.ends_with(".jsonl"))
  817. })
  818. .collect::<Vec<_>>();
  819. rotated_paths.sort_by_key(|entry_path| {
  820. fs::metadata(entry_path)
  821. .and_then(|metadata| metadata.modified())
  822. .unwrap_or(UNIX_EPOCH)
  823. });
  824. let remove_count = rotated_paths.len().saturating_sub(MAX_ROTATED_FILES);
  825. for stale_path in rotated_paths.into_iter().take(remove_count) {
  826. fs::remove_file(stale_path)?;
  827. }
  828. Ok(())
  829. }
  830. #[cfg(test)]
  831. mod tests {
  832. use super::{
  833. cleanup_rotated_logs, rotate_session_file_if_needed, ContentBlock, ConversationMessage,
  834. MessageRole, Session, SessionFork,
  835. };
  836. use crate::json::JsonValue;
  837. use crate::usage::TokenUsage;
  838. use std::fs;
  839. use std::path::PathBuf;
  840. use std::time::{SystemTime, UNIX_EPOCH};
  841. #[test]
  842. fn persists_and_restores_session_jsonl() {
  843. let mut session = Session::new();
  844. session
  845. .push_user_text("hello")
  846. .expect("user message should append");
  847. session
  848. .push_message(ConversationMessage::assistant_with_usage(
  849. vec![
  850. ContentBlock::Text {
  851. text: "thinking".to_string(),
  852. },
  853. ContentBlock::ToolUse {
  854. id: "tool-1".to_string(),
  855. name: "bash".to_string(),
  856. input: "echo hi".to_string(),
  857. },
  858. ],
  859. Some(TokenUsage {
  860. input_tokens: 10,
  861. output_tokens: 4,
  862. cache_creation_input_tokens: 1,
  863. cache_read_input_tokens: 2,
  864. }),
  865. ))
  866. .expect("assistant message should append");
  867. session
  868. .push_message(ConversationMessage::tool_result(
  869. "tool-1", "bash", "hi", false,
  870. ))
  871. .expect("tool result should append");
  872. let path = temp_session_path("jsonl");
  873. session.save_to_path(&path).expect("session should save");
  874. let restored = Session::load_from_path(&path).expect("session should load");
  875. fs::remove_file(&path).expect("temp file should be removable");
  876. assert_eq!(restored, session);
  877. assert_eq!(restored.messages[2].role, MessageRole::Tool);
  878. assert_eq!(
  879. restored.messages[1].usage.expect("usage").total_tokens(),
  880. 17
  881. );
  882. assert_eq!(restored.session_id, session.session_id);
  883. }
  884. #[test]
  885. fn loads_legacy_session_json_object() {
  886. let path = temp_session_path("legacy");
  887. let legacy = JsonValue::Object(
  888. [
  889. ("version".to_string(), JsonValue::Number(1)),
  890. (
  891. "messages".to_string(),
  892. JsonValue::Array(vec![ConversationMessage::user_text("legacy").to_json()]),
  893. ),
  894. ]
  895. .into_iter()
  896. .collect(),
  897. );
  898. fs::write(&path, legacy.render()).expect("legacy file should write");
  899. let restored = Session::load_from_path(&path).expect("legacy session should load");
  900. fs::remove_file(&path).expect("temp file should be removable");
  901. assert_eq!(restored.messages.len(), 1);
  902. assert_eq!(
  903. restored.messages[0],
  904. ConversationMessage::user_text("legacy")
  905. );
  906. assert!(!restored.session_id.is_empty());
  907. }
  908. #[test]
  909. fn appends_messages_to_persisted_jsonl_session() {
  910. let path = temp_session_path("append");
  911. let mut session = Session::new().with_persistence_path(path.clone());
  912. session
  913. .save_to_path(&path)
  914. .expect("initial save should succeed");
  915. session
  916. .push_user_text("hi")
  917. .expect("user append should succeed");
  918. session
  919. .push_message(ConversationMessage::assistant(vec![ContentBlock::Text {
  920. text: "hello".to_string(),
  921. }]))
  922. .expect("assistant append should succeed");
  923. let restored = Session::load_from_path(&path).expect("session should replay from jsonl");
  924. fs::remove_file(&path).expect("temp file should be removable");
  925. assert_eq!(restored.messages.len(), 2);
  926. assert_eq!(restored.messages[0], ConversationMessage::user_text("hi"));
  927. }
  928. #[test]
  929. fn persists_compaction_metadata() {
  930. let path = temp_session_path("compaction");
  931. let mut session = Session::new();
  932. session
  933. .push_user_text("before")
  934. .expect("message should append");
  935. session.record_compaction("summarized earlier work", 4);
  936. session.save_to_path(&path).expect("session should save");
  937. let restored = Session::load_from_path(&path).expect("session should load");
  938. fs::remove_file(&path).expect("temp file should be removable");
  939. let compaction = restored.compaction.expect("compaction metadata");
  940. assert_eq!(compaction.count, 1);
  941. assert_eq!(compaction.removed_message_count, 4);
  942. assert!(compaction.summary.contains("summarized"));
  943. }
  944. #[test]
  945. fn forks_sessions_with_branch_metadata_and_persists_it() {
  946. let path = temp_session_path("fork");
  947. let mut session = Session::new();
  948. session
  949. .push_user_text("before fork")
  950. .expect("message should append");
  951. let forked = session
  952. .fork(Some("investigation".to_string()))
  953. .with_persistence_path(path.clone());
  954. forked
  955. .save_to_path(&path)
  956. .expect("forked session should save");
  957. let restored = Session::load_from_path(&path).expect("forked session should load");
  958. fs::remove_file(&path).expect("temp file should be removable");
  959. assert_ne!(restored.session_id, session.session_id);
  960. assert_eq!(
  961. restored.fork,
  962. Some(SessionFork {
  963. parent_session_id: session.session_id,
  964. branch_name: Some("investigation".to_string()),
  965. })
  966. );
  967. assert_eq!(restored.messages, forked.messages);
  968. }
  969. #[test]
  970. fn rotates_and_cleans_up_large_session_logs() {
  971. let path = temp_session_path("rotation");
  972. fs::write(&path, "x".repeat((super::ROTATE_AFTER_BYTES + 10) as usize))
  973. .expect("oversized file should write");
  974. rotate_session_file_if_needed(&path).expect("rotation should succeed");
  975. assert!(
  976. !path.exists(),
  977. "original path should be rotated away before rewrite"
  978. );
  979. for _ in 0..5 {
  980. let rotated = super::rotated_log_path(&path);
  981. fs::write(&rotated, "old").expect("rotated file should write");
  982. }
  983. cleanup_rotated_logs(&path).expect("cleanup should succeed");
  984. let rotated_count = rotation_files(&path).len();
  985. assert!(rotated_count <= super::MAX_ROTATED_FILES);
  986. for rotated in rotation_files(&path) {
  987. fs::remove_file(rotated).expect("rotated file should be removable");
  988. }
  989. }
  990. fn temp_session_path(label: &str) -> PathBuf {
  991. let nanos = SystemTime::now()
  992. .duration_since(UNIX_EPOCH)
  993. .expect("system time should be after epoch")
  994. .as_nanos();
  995. std::env::temp_dir().join(format!("runtime-session-{label}-{nanos}.json"))
  996. }
  997. fn rotation_files(path: &PathBuf) -> Vec<PathBuf> {
  998. let stem = path
  999. .file_stem()
  1000. .and_then(|value| value.to_str())
  1001. .expect("temp path should have file stem")
  1002. .to_string();
  1003. fs::read_dir(path.parent().expect("temp path should have parent"))
  1004. .expect("temp dir should read")
  1005. .filter_map(Result::ok)
  1006. .map(|entry| entry.path())
  1007. .filter(|entry_path| {
  1008. entry_path
  1009. .file_name()
  1010. .and_then(|value| value.to_str())
  1011. .is_some_and(|name| {
  1012. name.starts_with(&format!("{stem}.rot-")) && name.ends_with(".jsonl")
  1013. })
  1014. })
  1015. .collect()
  1016. }
  1017. }