|
@@ -1,11 +1,19 @@
|
|
|
use std::collections::BTreeMap;
|
|
use std::collections::BTreeMap;
|
|
|
use std::fmt::{Display, Formatter};
|
|
use std::fmt::{Display, Formatter};
|
|
|
-use std::fs;
|
|
|
|
|
-use std::path::Path;
|
|
|
|
|
|
|
+use std::fs::{self, OpenOptions};
|
|
|
|
|
+use std::io::Write;
|
|
|
|
|
+use std::path::{Path, PathBuf};
|
|
|
|
|
+use std::sync::atomic::{AtomicU64, Ordering};
|
|
|
|
|
+use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
|
|
|
|
use crate::json::{JsonError, JsonValue};
|
|
use crate::json::{JsonError, JsonValue};
|
|
|
use crate::usage::TokenUsage;
|
|
use crate::usage::TokenUsage;
|
|
|
|
|
|
|
|
|
|
+const SESSION_VERSION: u32 = 1;
|
|
|
|
|
+const ROTATE_AFTER_BYTES: u64 = 256 * 1024;
|
|
|
|
|
+const MAX_ROTATED_FILES: usize = 3;
|
|
|
|
|
+static SESSION_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
|
|
|
|
|
+
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
|
pub enum MessageRole {
|
|
pub enum MessageRole {
|
|
|
System,
|
|
System,
|
|
@@ -40,11 +48,41 @@ pub struct ConversationMessage {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
|
|
|
+pub struct SessionCompaction {
|
|
|
|
|
+ pub count: u32,
|
|
|
|
|
+ pub removed_message_count: usize,
|
|
|
|
|
+ pub summary: String,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone, PartialEq, Eq)]
|
|
|
|
|
+struct SessionPersistence {
|
|
|
|
|
+ path: PathBuf,
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+#[derive(Debug, Clone)]
|
|
|
pub struct Session {
|
|
pub struct Session {
|
|
|
pub version: u32,
|
|
pub version: u32,
|
|
|
|
|
+ pub session_id: String,
|
|
|
|
|
+ pub created_at_ms: u64,
|
|
|
|
|
+ pub updated_at_ms: u64,
|
|
|
pub messages: Vec<ConversationMessage>,
|
|
pub messages: Vec<ConversationMessage>,
|
|
|
|
|
+ pub compaction: Option<SessionCompaction>,
|
|
|
|
|
+ persistence: Option<SessionPersistence>,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+impl PartialEq for Session {
|
|
|
|
|
+ fn eq(&self, other: &Self) -> bool {
|
|
|
|
|
+ self.version == other.version
|
|
|
|
|
+ && self.session_id == other.session_id
|
|
|
|
|
+ && self.created_at_ms == other.created_at_ms
|
|
|
|
|
+ && self.updated_at_ms == other.updated_at_ms
|
|
|
|
|
+ && self.messages == other.messages
|
|
|
|
|
+ && self.compaction == other.compaction
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl Eq for Session {}
|
|
|
|
|
+
|
|
|
#[derive(Debug)]
|
|
#[derive(Debug)]
|
|
|
pub enum SessionError {
|
|
pub enum SessionError {
|
|
|
Io(std::io::Error),
|
|
Io(std::io::Error),
|
|
@@ -79,20 +117,65 @@ impl From<JsonError> for SessionError {
|
|
|
impl Session {
|
|
impl Session {
|
|
|
#[must_use]
|
|
#[must_use]
|
|
|
pub fn new() -> Self {
|
|
pub fn new() -> Self {
|
|
|
|
|
+ let now = current_time_millis();
|
|
|
Self {
|
|
Self {
|
|
|
- version: 1,
|
|
|
|
|
|
|
+ version: SESSION_VERSION,
|
|
|
|
|
+ session_id: generate_session_id(),
|
|
|
|
|
+ created_at_ms: now,
|
|
|
|
|
+ updated_at_ms: now,
|
|
|
messages: Vec::new(),
|
|
messages: Vec::new(),
|
|
|
|
|
+ compaction: None,
|
|
|
|
|
+ persistence: None,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ #[must_use]
|
|
|
|
|
+ pub fn with_persistence_path(mut self, path: impl Into<PathBuf>) -> Self {
|
|
|
|
|
+ self.persistence = Some(SessionPersistence { path: path.into() });
|
|
|
|
|
+ self
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[must_use]
|
|
|
|
|
+ pub fn persistence_path(&self) -> Option<&Path> {
|
|
|
|
|
+ self.persistence.as_ref().map(|value| value.path.as_path())
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<(), SessionError> {
|
|
pub fn save_to_path(&self, path: impl AsRef<Path>) -> Result<(), SessionError> {
|
|
|
- fs::write(path, self.to_json().render())?;
|
|
|
|
|
|
|
+ let path = path.as_ref();
|
|
|
|
|
+ rotate_session_file_if_needed(path)?;
|
|
|
|
|
+ write_atomic(path, &self.render_jsonl_snapshot())?;
|
|
|
|
|
+ cleanup_rotated_logs(path)?;
|
|
|
Ok(())
|
|
Ok(())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self, SessionError> {
|
|
pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self, SessionError> {
|
|
|
|
|
+ let path = path.as_ref();
|
|
|
let contents = fs::read_to_string(path)?;
|
|
let contents = fs::read_to_string(path)?;
|
|
|
- Self::from_json(&JsonValue::parse(&contents)?)
|
|
|
|
|
|
|
+ let session = match JsonValue::parse(&contents) {
|
|
|
|
|
+ Ok(value) => Self::from_json(&value)?,
|
|
|
|
|
+ Err(_) => Self::from_jsonl(&contents)?,
|
|
|
|
|
+ };
|
|
|
|
|
+ Ok(session.with_persistence_path(path.to_path_buf()))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn push_message(&mut self, message: ConversationMessage) -> Result<(), SessionError> {
|
|
|
|
|
+ self.touch();
|
|
|
|
|
+ self.messages.push(message.clone());
|
|
|
|
|
+ self.append_persisted_message(&message)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn push_user_text(&mut self, text: impl Into<String>) -> Result<(), SessionError> {
|
|
|
|
|
+ self.push_message(ConversationMessage::user_text(text))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pub fn record_compaction(&mut self, summary: impl Into<String>, removed_message_count: usize) {
|
|
|
|
|
+ self.touch();
|
|
|
|
|
+ let count = self.compaction.as_ref().map_or(1, |value| value.count + 1);
|
|
|
|
|
+ self.compaction = Some(SessionCompaction {
|
|
|
|
|
+ count,
|
|
|
|
|
+ removed_message_count,
|
|
|
|
|
+ summary: summary.into(),
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#[must_use]
|
|
#[must_use]
|
|
@@ -102,6 +185,18 @@ impl Session {
|
|
|
"version".to_string(),
|
|
"version".to_string(),
|
|
|
JsonValue::Number(i64::from(self.version)),
|
|
JsonValue::Number(i64::from(self.version)),
|
|
|
);
|
|
);
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "session_id".to_string(),
|
|
|
|
|
+ JsonValue::String(self.session_id.clone()),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "created_at_ms".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64_from_u64(self.created_at_ms, "created_at_ms")),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "updated_at_ms".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64_from_u64(self.updated_at_ms, "updated_at_ms")),
|
|
|
|
|
+ );
|
|
|
object.insert(
|
|
object.insert(
|
|
|
"messages".to_string(),
|
|
"messages".to_string(),
|
|
|
JsonValue::Array(
|
|
JsonValue::Array(
|
|
@@ -111,6 +206,9 @@ impl Session {
|
|
|
.collect(),
|
|
.collect(),
|
|
|
),
|
|
),
|
|
|
);
|
|
);
|
|
|
|
|
+ if let Some(compaction) = &self.compaction {
|
|
|
|
|
+ object.insert("compaction".to_string(), compaction.to_json());
|
|
|
|
|
+ }
|
|
|
JsonValue::Object(object)
|
|
JsonValue::Object(object)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -131,7 +229,171 @@ impl Session {
|
|
|
.iter()
|
|
.iter()
|
|
|
.map(ConversationMessage::from_json)
|
|
.map(ConversationMessage::from_json)
|
|
|
.collect::<Result<Vec<_>, _>>()?;
|
|
.collect::<Result<Vec<_>, _>>()?;
|
|
|
- Ok(Self { version, messages })
|
|
|
|
|
|
|
+ let now = current_time_millis();
|
|
|
|
|
+ let session_id = object
|
|
|
|
|
+ .get("session_id")
|
|
|
|
|
+ .and_then(JsonValue::as_str)
|
|
|
|
|
+ .map(ToOwned::to_owned)
|
|
|
|
|
+ .unwrap_or_else(generate_session_id);
|
|
|
|
|
+ let created_at_ms = object
|
|
|
|
|
+ .get("created_at_ms")
|
|
|
|
|
+ .map(|value| required_u64_from_value(value, "created_at_ms"))
|
|
|
|
|
+ .transpose()?
|
|
|
|
|
+ .unwrap_or(now);
|
|
|
|
|
+ let updated_at_ms = object
|
|
|
|
|
+ .get("updated_at_ms")
|
|
|
|
|
+ .map(|value| required_u64_from_value(value, "updated_at_ms"))
|
|
|
|
|
+ .transpose()?
|
|
|
|
|
+ .unwrap_or(created_at_ms);
|
|
|
|
|
+ let compaction = object
|
|
|
|
|
+ .get("compaction")
|
|
|
|
|
+ .map(SessionCompaction::from_json)
|
|
|
|
|
+ .transpose()?;
|
|
|
|
|
+ Ok(Self {
|
|
|
|
|
+ version,
|
|
|
|
|
+ session_id,
|
|
|
|
|
+ created_at_ms,
|
|
|
|
|
+ updated_at_ms,
|
|
|
|
|
+ messages,
|
|
|
|
|
+ compaction,
|
|
|
|
|
+ persistence: None,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn from_jsonl(contents: &str) -> Result<Self, SessionError> {
|
|
|
|
|
+ let mut version = SESSION_VERSION;
|
|
|
|
|
+ let mut session_id = None;
|
|
|
|
|
+ let mut created_at_ms = None;
|
|
|
|
|
+ let mut updated_at_ms = None;
|
|
|
|
|
+ let mut messages = Vec::new();
|
|
|
|
|
+ let mut compaction = None;
|
|
|
|
|
+
|
|
|
|
|
+ for (line_number, raw_line) in contents.lines().enumerate() {
|
|
|
|
|
+ let line = raw_line.trim();
|
|
|
|
|
+ if line.is_empty() {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ let value = JsonValue::parse(line).map_err(|error| {
|
|
|
|
|
+ SessionError::Format(format!(
|
|
|
|
|
+ "invalid JSONL record at line {}: {}",
|
|
|
|
|
+ line_number + 1,
|
|
|
|
|
+ error
|
|
|
|
|
+ ))
|
|
|
|
|
+ })?;
|
|
|
|
|
+ let object = value.as_object().ok_or_else(|| {
|
|
|
|
|
+ SessionError::Format(format!(
|
|
|
|
|
+ "JSONL record at line {} must be an object",
|
|
|
|
|
+ line_number + 1
|
|
|
|
|
+ ))
|
|
|
|
|
+ })?;
|
|
|
|
|
+ match object
|
|
|
|
|
+ .get("type")
|
|
|
|
|
+ .and_then(JsonValue::as_str)
|
|
|
|
|
+ .ok_or_else(|| {
|
|
|
|
|
+ SessionError::Format(format!(
|
|
|
|
|
+ "JSONL record at line {} missing type",
|
|
|
|
|
+ line_number + 1
|
|
|
|
|
+ ))
|
|
|
|
|
+ })? {
|
|
|
|
|
+ "session_meta" => {
|
|
|
|
|
+ version = required_u32(object, "version")?;
|
|
|
|
|
+ session_id = Some(required_string(object, "session_id")?);
|
|
|
|
|
+ created_at_ms = Some(required_u64(object, "created_at_ms")?);
|
|
|
|
|
+ updated_at_ms = Some(required_u64(object, "updated_at_ms")?);
|
|
|
|
|
+ }
|
|
|
|
|
+ "message" => {
|
|
|
|
|
+ let message_value = object.get("message").ok_or_else(|| {
|
|
|
|
|
+ SessionError::Format(format!(
|
|
|
|
|
+ "JSONL record at line {} missing message",
|
|
|
|
|
+ line_number + 1
|
|
|
|
|
+ ))
|
|
|
|
|
+ })?;
|
|
|
|
|
+ messages.push(ConversationMessage::from_json(message_value)?);
|
|
|
|
|
+ }
|
|
|
|
|
+ "compaction" => {
|
|
|
|
|
+ compaction = Some(SessionCompaction::from_json(&JsonValue::Object(
|
|
|
|
|
+ object.clone(),
|
|
|
|
|
+ ))?);
|
|
|
|
|
+ }
|
|
|
|
|
+ other => {
|
|
|
|
|
+ return Err(SessionError::Format(format!(
|
|
|
|
|
+ "unsupported JSONL record type at line {}: {other}",
|
|
|
|
|
+ line_number + 1
|
|
|
|
|
+ )))
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let now = current_time_millis();
|
|
|
|
|
+ Ok(Self {
|
|
|
|
|
+ version,
|
|
|
|
|
+ session_id: session_id.unwrap_or_else(generate_session_id),
|
|
|
|
|
+ created_at_ms: created_at_ms.unwrap_or(now),
|
|
|
|
|
+ updated_at_ms: updated_at_ms.unwrap_or(created_at_ms.unwrap_or(now)),
|
|
|
|
|
+ messages,
|
|
|
|
|
+ compaction,
|
|
|
|
|
+ persistence: None,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn render_jsonl_snapshot(&self) -> String {
|
|
|
|
|
+ let mut lines = vec![self.meta_record().render()];
|
|
|
|
|
+ if let Some(compaction) = &self.compaction {
|
|
|
|
|
+ lines.push(compaction.to_jsonl_record().render());
|
|
|
|
|
+ }
|
|
|
|
|
+ lines.extend(
|
|
|
|
|
+ self.messages
|
|
|
|
|
+ .iter()
|
|
|
|
|
+ .map(|message| message_record(message).render()),
|
|
|
|
|
+ );
|
|
|
|
|
+ let mut rendered = lines.join("\n");
|
|
|
|
|
+ rendered.push('\n');
|
|
|
|
|
+ rendered
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn append_persisted_message(&self, message: &ConversationMessage) -> Result<(), SessionError> {
|
|
|
|
|
+ let Some(path) = self.persistence_path() else {
|
|
|
|
|
+ return Ok(());
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ let needs_bootstrap = !path.exists() || fs::metadata(path)?.len() == 0;
|
|
|
|
|
+ if needs_bootstrap {
|
|
|
|
|
+ self.save_to_path(path)?;
|
|
|
|
|
+ return Ok(());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ let mut file = OpenOptions::new().append(true).open(path)?;
|
|
|
|
|
+ writeln!(file, "{}", message_record(message).render())?;
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn meta_record(&self) -> JsonValue {
|
|
|
|
|
+ let mut object = BTreeMap::new();
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "type".to_string(),
|
|
|
|
|
+ JsonValue::String("session_meta".to_string()),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "version".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64::from(self.version)),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "session_id".to_string(),
|
|
|
|
|
+ JsonValue::String(self.session_id.clone()),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "created_at_ms".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64_from_u64(self.created_at_ms, "created_at_ms")),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "updated_at_ms".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64_from_u64(self.updated_at_ms, "updated_at_ms")),
|
|
|
|
|
+ );
|
|
|
|
|
+ JsonValue::Object(object)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn touch(&mut self) {
|
|
|
|
|
+ self.updated_at_ms = current_time_millis();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -324,6 +586,61 @@ impl ContentBlock {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+impl SessionCompaction {
|
|
|
|
|
+ #[must_use]
|
|
|
|
|
+ pub fn to_json(&self) -> JsonValue {
|
|
|
|
|
+ let mut object = BTreeMap::new();
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "count".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64::from(self.count)),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "removed_message_count".to_string(),
|
|
|
|
|
+ JsonValue::Number(i64_from_usize(
|
|
|
|
|
+ self.removed_message_count,
|
|
|
|
|
+ "removed_message_count",
|
|
|
|
|
+ )),
|
|
|
|
|
+ );
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "summary".to_string(),
|
|
|
|
|
+ JsonValue::String(self.summary.clone()),
|
|
|
|
|
+ );
|
|
|
|
|
+ JsonValue::Object(object)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[must_use]
|
|
|
|
|
+ pub fn to_jsonl_record(&self) -> JsonValue {
|
|
|
|
|
+ let mut object = self
|
|
|
|
|
+ .to_json()
|
|
|
|
|
+ .as_object()
|
|
|
|
|
+ .cloned()
|
|
|
|
|
+ .expect("compaction should render to object");
|
|
|
|
|
+ object.insert(
|
|
|
|
|
+ "type".to_string(),
|
|
|
|
|
+ JsonValue::String("compaction".to_string()),
|
|
|
|
|
+ );
|
|
|
|
|
+ JsonValue::Object(object)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn from_json(value: &JsonValue) -> Result<Self, SessionError> {
|
|
|
|
|
+ let object = value
|
|
|
|
|
+ .as_object()
|
|
|
|
|
+ .ok_or_else(|| SessionError::Format("compaction must be an object".to_string()))?;
|
|
|
|
|
+ Ok(Self {
|
|
|
|
|
+ count: required_u32(object, "count")?,
|
|
|
|
|
+ removed_message_count: required_usize(object, "removed_message_count")?,
|
|
|
|
|
+ summary: required_string(object, "summary")?,
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn message_record(message: &ConversationMessage) -> JsonValue {
|
|
|
|
|
+ let mut object = BTreeMap::new();
|
|
|
|
|
+ object.insert("type".to_string(), JsonValue::String("message".to_string()));
|
|
|
|
|
+ object.insert("message".to_string(), message.to_json());
|
|
|
|
|
+ JsonValue::Object(object)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
fn usage_to_json(usage: TokenUsage) -> JsonValue {
|
|
fn usage_to_json(usage: TokenUsage) -> JsonValue {
|
|
|
let mut object = BTreeMap::new();
|
|
let mut object = BTreeMap::new();
|
|
|
object.insert(
|
|
object.insert(
|
|
@@ -376,22 +693,144 @@ fn required_u32(object: &BTreeMap<String, JsonValue>, key: &str) -> Result<u32,
|
|
|
u32::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
|
|
u32::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+fn required_u64(object: &BTreeMap<String, JsonValue>, key: &str) -> Result<u64, SessionError> {
|
|
|
|
|
+ let value = object
|
|
|
|
|
+ .get(key)
|
|
|
|
|
+ .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
|
|
|
|
|
+ required_u64_from_value(value, key)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn required_u64_from_value(value: &JsonValue, key: &str) -> Result<u64, SessionError> {
|
|
|
|
|
+ let value = value
|
|
|
|
|
+ .as_i64()
|
|
|
|
|
+ .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
|
|
|
|
|
+ u64::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn required_usize(object: &BTreeMap<String, JsonValue>, key: &str) -> Result<usize, SessionError> {
|
|
|
|
|
+ let value = object
|
|
|
|
|
+ .get(key)
|
|
|
|
|
+ .and_then(JsonValue::as_i64)
|
|
|
|
|
+ .ok_or_else(|| SessionError::Format(format!("missing {key}")))?;
|
|
|
|
|
+ usize::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range")))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn i64_from_u64(value: u64, key: &str) -> i64 {
|
|
|
|
|
+ i64::try_from(value).unwrap_or_else(|_| panic!("{key} out of range for JSON number"))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn i64_from_usize(value: usize, key: &str) -> i64 {
|
|
|
|
|
+ i64::try_from(value).unwrap_or_else(|_| panic!("{key} out of range for JSON number"))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn current_time_millis() -> u64 {
|
|
|
|
|
+ SystemTime::now()
|
|
|
|
|
+ .duration_since(UNIX_EPOCH)
|
|
|
|
|
+ .map(|duration| duration.as_millis() as u64)
|
|
|
|
|
+ .unwrap_or_default()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn generate_session_id() -> String {
|
|
|
|
|
+ let millis = current_time_millis();
|
|
|
|
|
+ let counter = SESSION_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
+ format!("session-{millis}-{counter}")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn write_atomic(path: &Path, contents: &str) -> Result<(), SessionError> {
|
|
|
|
|
+ if let Some(parent) = path.parent() {
|
|
|
|
|
+ fs::create_dir_all(parent)?;
|
|
|
|
|
+ }
|
|
|
|
|
+ let temp_path = temporary_path_for(path);
|
|
|
|
|
+ fs::write(&temp_path, contents)?;
|
|
|
|
|
+ fs::rename(temp_path, path)?;
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn temporary_path_for(path: &Path) -> PathBuf {
|
|
|
|
|
+ let file_name = path
|
|
|
|
|
+ .file_name()
|
|
|
|
|
+ .and_then(|value| value.to_str())
|
|
|
|
|
+ .unwrap_or("session");
|
|
|
|
|
+ path.with_file_name(format!(
|
|
|
|
|
+ "{file_name}.tmp-{}-{}",
|
|
|
|
|
+ current_time_millis(),
|
|
|
|
|
+ SESSION_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
|
|
|
|
|
+ ))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn rotate_session_file_if_needed(path: &Path) -> Result<(), SessionError> {
|
|
|
|
|
+ let Ok(metadata) = fs::metadata(path) else {
|
|
|
|
|
+ return Ok(());
|
|
|
|
|
+ };
|
|
|
|
|
+ if metadata.len() < ROTATE_AFTER_BYTES {
|
|
|
|
|
+ return Ok(());
|
|
|
|
|
+ }
|
|
|
|
|
+ let rotated_path = rotated_log_path(path);
|
|
|
|
|
+ fs::rename(path, rotated_path)?;
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn rotated_log_path(path: &Path) -> PathBuf {
|
|
|
|
|
+ let stem = path
|
|
|
|
|
+ .file_stem()
|
|
|
|
|
+ .and_then(|value| value.to_str())
|
|
|
|
|
+ .unwrap_or("session");
|
|
|
|
|
+ path.with_file_name(format!("{stem}.rot-{}.jsonl", current_time_millis()))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn cleanup_rotated_logs(path: &Path) -> Result<(), SessionError> {
|
|
|
|
|
+ let Some(parent) = path.parent() else {
|
|
|
|
|
+ return Ok(());
|
|
|
|
|
+ };
|
|
|
|
|
+ let stem = path
|
|
|
|
|
+ .file_stem()
|
|
|
|
|
+ .and_then(|value| value.to_str())
|
|
|
|
|
+ .unwrap_or("session");
|
|
|
|
|
+ let prefix = format!("{stem}.rot-");
|
|
|
|
|
+ let mut rotated_paths = fs::read_dir(parent)?
|
|
|
|
|
+ .filter_map(Result::ok)
|
|
|
|
|
+ .map(|entry| entry.path())
|
|
|
|
|
+ .filter(|entry_path| {
|
|
|
|
|
+ entry_path
|
|
|
|
|
+ .file_name()
|
|
|
|
|
+ .and_then(|value| value.to_str())
|
|
|
|
|
+ .is_some_and(|name| name.starts_with(&prefix) && name.ends_with(".jsonl"))
|
|
|
|
|
+ })
|
|
|
|
|
+ .collect::<Vec<_>>();
|
|
|
|
|
+
|
|
|
|
|
+ rotated_paths.sort_by_key(|entry_path| {
|
|
|
|
|
+ fs::metadata(entry_path)
|
|
|
|
|
+ .and_then(|metadata| metadata.modified())
|
|
|
|
|
+ .unwrap_or(UNIX_EPOCH)
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ let remove_count = rotated_paths.len().saturating_sub(MAX_ROTATED_FILES);
|
|
|
|
|
+ for stale_path in rotated_paths.into_iter().take(remove_count) {
|
|
|
|
|
+ fs::remove_file(stale_path)?;
|
|
|
|
|
+ }
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
|
mod tests {
|
|
mod tests {
|
|
|
- use super::{ContentBlock, ConversationMessage, MessageRole, Session};
|
|
|
|
|
|
|
+ use super::{
|
|
|
|
|
+ cleanup_rotated_logs, rotate_session_file_if_needed, ContentBlock, ConversationMessage,
|
|
|
|
|
+ MessageRole, Session,
|
|
|
|
|
+ };
|
|
|
|
|
+ use crate::json::JsonValue;
|
|
|
use crate::usage::TokenUsage;
|
|
use crate::usage::TokenUsage;
|
|
|
use std::fs;
|
|
use std::fs;
|
|
|
|
|
+ use std::path::PathBuf;
|
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
|
- fn persists_and_restores_session_json() {
|
|
|
|
|
|
|
+ fn persists_and_restores_session_jsonl() {
|
|
|
let mut session = Session::new();
|
|
let mut session = Session::new();
|
|
|
session
|
|
session
|
|
|
- .messages
|
|
|
|
|
- .push(ConversationMessage::user_text("hello"));
|
|
|
|
|
|
|
+ .push_user_text("hello")
|
|
|
|
|
+ .expect("user message should append");
|
|
|
session
|
|
session
|
|
|
- .messages
|
|
|
|
|
- .push(ConversationMessage::assistant_with_usage(
|
|
|
|
|
|
|
+ .push_message(ConversationMessage::assistant_with_usage(
|
|
|
vec![
|
|
vec![
|
|
|
ContentBlock::Text {
|
|
ContentBlock::Text {
|
|
|
text: "thinking".to_string(),
|
|
text: "thinking".to_string(),
|
|
@@ -408,16 +847,15 @@ mod tests {
|
|
|
cache_creation_input_tokens: 1,
|
|
cache_creation_input_tokens: 1,
|
|
|
cache_read_input_tokens: 2,
|
|
cache_read_input_tokens: 2,
|
|
|
}),
|
|
}),
|
|
|
- ));
|
|
|
|
|
- session.messages.push(ConversationMessage::tool_result(
|
|
|
|
|
- "tool-1", "bash", "hi", false,
|
|
|
|
|
- ));
|
|
|
|
|
|
|
+ ))
|
|
|
|
|
+ .expect("assistant message should append");
|
|
|
|
|
+ session
|
|
|
|
|
+ .push_message(ConversationMessage::tool_result(
|
|
|
|
|
+ "tool-1", "bash", "hi", false,
|
|
|
|
|
+ ))
|
|
|
|
|
+ .expect("tool result should append");
|
|
|
|
|
|
|
|
- let nanos = SystemTime::now()
|
|
|
|
|
- .duration_since(UNIX_EPOCH)
|
|
|
|
|
- .expect("system time should be after epoch")
|
|
|
|
|
- .as_nanos();
|
|
|
|
|
- let path = std::env::temp_dir().join(format!("runtime-session-{nanos}.json"));
|
|
|
|
|
|
|
+ let path = temp_session_path("jsonl");
|
|
|
session.save_to_path(&path).expect("session should save");
|
|
session.save_to_path(&path).expect("session should save");
|
|
|
let restored = Session::load_from_path(&path).expect("session should load");
|
|
let restored = Session::load_from_path(&path).expect("session should load");
|
|
|
fs::remove_file(&path).expect("temp file should be removable");
|
|
fs::remove_file(&path).expect("temp file should be removable");
|
|
@@ -428,5 +866,128 @@ mod tests {
|
|
|
restored.messages[1].usage.expect("usage").total_tokens(),
|
|
restored.messages[1].usage.expect("usage").total_tokens(),
|
|
|
17
|
|
17
|
|
|
);
|
|
);
|
|
|
|
|
+ assert_eq!(restored.session_id, session.session_id);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn loads_legacy_session_json_object() {
|
|
|
|
|
+ let path = temp_session_path("legacy");
|
|
|
|
|
+ let legacy = JsonValue::Object(
|
|
|
|
|
+ [
|
|
|
|
|
+ ("version".to_string(), JsonValue::Number(1)),
|
|
|
|
|
+ (
|
|
|
|
|
+ "messages".to_string(),
|
|
|
|
|
+ JsonValue::Array(vec![ConversationMessage::user_text("legacy").to_json()]),
|
|
|
|
|
+ ),
|
|
|
|
|
+ ]
|
|
|
|
|
+ .into_iter()
|
|
|
|
|
+ .collect(),
|
|
|
|
|
+ );
|
|
|
|
|
+ fs::write(&path, legacy.render()).expect("legacy file should write");
|
|
|
|
|
+
|
|
|
|
|
+ let restored = Session::load_from_path(&path).expect("legacy session should load");
|
|
|
|
|
+ fs::remove_file(&path).expect("temp file should be removable");
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(restored.messages.len(), 1);
|
|
|
|
|
+ assert_eq!(
|
|
|
|
|
+ restored.messages[0],
|
|
|
|
|
+ ConversationMessage::user_text("legacy")
|
|
|
|
|
+ );
|
|
|
|
|
+ assert!(!restored.session_id.is_empty());
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn appends_messages_to_persisted_jsonl_session() {
|
|
|
|
|
+ let path = temp_session_path("append");
|
|
|
|
|
+ let mut session = Session::new().with_persistence_path(path.clone());
|
|
|
|
|
+ session
|
|
|
|
|
+ .save_to_path(&path)
|
|
|
|
|
+ .expect("initial save should succeed");
|
|
|
|
|
+ session
|
|
|
|
|
+ .push_user_text("hi")
|
|
|
|
|
+ .expect("user append should succeed");
|
|
|
|
|
+ session
|
|
|
|
|
+ .push_message(ConversationMessage::assistant(vec![ContentBlock::Text {
|
|
|
|
|
+ text: "hello".to_string(),
|
|
|
|
|
+ }]))
|
|
|
|
|
+ .expect("assistant append should succeed");
|
|
|
|
|
+
|
|
|
|
|
+ let restored = Session::load_from_path(&path).expect("session should replay from jsonl");
|
|
|
|
|
+ fs::remove_file(&path).expect("temp file should be removable");
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(restored.messages.len(), 2);
|
|
|
|
|
+ assert_eq!(restored.messages[0], ConversationMessage::user_text("hi"));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn persists_compaction_metadata() {
|
|
|
|
|
+ let path = temp_session_path("compaction");
|
|
|
|
|
+ let mut session = Session::new();
|
|
|
|
|
+ session
|
|
|
|
|
+ .push_user_text("before")
|
|
|
|
|
+ .expect("message should append");
|
|
|
|
|
+ session.record_compaction("summarized earlier work", 4);
|
|
|
|
|
+ session.save_to_path(&path).expect("session should save");
|
|
|
|
|
+
|
|
|
|
|
+ let restored = Session::load_from_path(&path).expect("session should load");
|
|
|
|
|
+ fs::remove_file(&path).expect("temp file should be removable");
|
|
|
|
|
+
|
|
|
|
|
+ let compaction = restored.compaction.expect("compaction metadata");
|
|
|
|
|
+ assert_eq!(compaction.count, 1);
|
|
|
|
|
+ assert_eq!(compaction.removed_message_count, 4);
|
|
|
|
|
+ assert!(compaction.summary.contains("summarized"));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ #[test]
|
|
|
|
|
+ fn rotates_and_cleans_up_large_session_logs() {
|
|
|
|
|
+ let path = temp_session_path("rotation");
|
|
|
|
|
+ fs::write(&path, "x".repeat((super::ROTATE_AFTER_BYTES + 10) as usize))
|
|
|
|
|
+ .expect("oversized file should write");
|
|
|
|
|
+ rotate_session_file_if_needed(&path).expect("rotation should succeed");
|
|
|
|
|
+ assert!(
|
|
|
|
|
+ !path.exists(),
|
|
|
|
|
+ "original path should be rotated away before rewrite"
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ for _ in 0..5 {
|
|
|
|
|
+ let rotated = super::rotated_log_path(&path);
|
|
|
|
|
+ fs::write(&rotated, "old").expect("rotated file should write");
|
|
|
|
|
+ }
|
|
|
|
|
+ cleanup_rotated_logs(&path).expect("cleanup should succeed");
|
|
|
|
|
+
|
|
|
|
|
+ let rotated_count = rotation_files(&path).len();
|
|
|
|
|
+ assert!(rotated_count <= super::MAX_ROTATED_FILES);
|
|
|
|
|
+ for rotated in rotation_files(&path) {
|
|
|
|
|
+ fs::remove_file(rotated).expect("rotated file should be removable");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn temp_session_path(label: &str) -> PathBuf {
|
|
|
|
|
+ let nanos = SystemTime::now()
|
|
|
|
|
+ .duration_since(UNIX_EPOCH)
|
|
|
|
|
+ .expect("system time should be after epoch")
|
|
|
|
|
+ .as_nanos();
|
|
|
|
|
+ std::env::temp_dir().join(format!("runtime-session-{label}-{nanos}.json"))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn rotation_files(path: &PathBuf) -> Vec<PathBuf> {
|
|
|
|
|
+ let stem = path
|
|
|
|
|
+ .file_stem()
|
|
|
|
|
+ .and_then(|value| value.to_str())
|
|
|
|
|
+ .expect("temp path should have file stem")
|
|
|
|
|
+ .to_string();
|
|
|
|
|
+ fs::read_dir(path.parent().expect("temp path should have parent"))
|
|
|
|
|
+ .expect("temp dir should read")
|
|
|
|
|
+ .filter_map(Result::ok)
|
|
|
|
|
+ .map(|entry| entry.path())
|
|
|
|
|
+ .filter(|entry_path| {
|
|
|
|
|
+ entry_path
|
|
|
|
|
+ .file_name()
|
|
|
|
|
+ .and_then(|value| value.to_str())
|
|
|
|
|
+ .is_some_and(|name| {
|
|
|
|
|
+ name.starts_with(&format!("{stem}.rot-")) && name.ends_with(".jsonl")
|
|
|
|
|
+ })
|
|
|
|
|
+ })
|
|
|
|
|
+ .collect()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|