| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- from __future__ import annotations
- import json
- from dataclasses import dataclass, field
- from uuid import uuid4
- from .commands import build_command_backlog
- from .models import PermissionDenial, UsageSummary
- from .port_manifest import PortManifest, build_port_manifest
- from .session_store import StoredSession, load_session, save_session
- from .tools import build_tool_backlog
- from .transcript import TranscriptStore
- @dataclass(frozen=True)
- class QueryEngineConfig:
- max_turns: int = 8
- max_budget_tokens: int = 2000
- compact_after_turns: int = 12
- structured_output: bool = False
- structured_retry_limit: int = 2
- @dataclass(frozen=True)
- class TurnResult:
- prompt: str
- output: str
- matched_commands: tuple[str, ...]
- matched_tools: tuple[str, ...]
- permission_denials: tuple[PermissionDenial, ...]
- usage: UsageSummary
- stop_reason: str
- @dataclass
- class QueryEnginePort:
- manifest: PortManifest
- config: QueryEngineConfig = field(default_factory=QueryEngineConfig)
- session_id: str = field(default_factory=lambda: uuid4().hex)
- mutable_messages: list[str] = field(default_factory=list)
- permission_denials: list[PermissionDenial] = field(default_factory=list)
- total_usage: UsageSummary = field(default_factory=UsageSummary)
- transcript_store: TranscriptStore = field(default_factory=TranscriptStore)
- @classmethod
- def from_workspace(cls) -> 'QueryEnginePort':
- return cls(manifest=build_port_manifest())
- @classmethod
- def from_saved_session(cls, session_id: str) -> 'QueryEnginePort':
- stored = load_session(session_id)
- transcript = TranscriptStore(entries=list(stored.messages), flushed=True)
- return cls(
- manifest=build_port_manifest(),
- session_id=stored.session_id,
- mutable_messages=list(stored.messages),
- total_usage=UsageSummary(stored.input_tokens, stored.output_tokens),
- transcript_store=transcript,
- )
- def submit_message(
- self,
- prompt: str,
- matched_commands: tuple[str, ...] = (),
- matched_tools: tuple[str, ...] = (),
- denied_tools: tuple[PermissionDenial, ...] = (),
- ) -> TurnResult:
- if len(self.mutable_messages) >= self.config.max_turns:
- output = f'Max turns reached before processing prompt: {prompt}'
- return TurnResult(
- prompt=prompt,
- output=output,
- matched_commands=matched_commands,
- matched_tools=matched_tools,
- permission_denials=denied_tools,
- usage=self.total_usage,
- stop_reason='max_turns_reached',
- )
- summary_lines = [
- f'Prompt: {prompt}',
- f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}',
- f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}',
- f'Permission denials: {len(denied_tools)}',
- ]
- output = self._format_output(summary_lines)
- projected_usage = self.total_usage.add_turn(prompt, output)
- stop_reason = 'completed'
- if projected_usage.input_tokens + projected_usage.output_tokens > self.config.max_budget_tokens:
- stop_reason = 'max_budget_reached'
- self.mutable_messages.append(prompt)
- self.transcript_store.append(prompt)
- self.permission_denials.extend(denied_tools)
- self.total_usage = projected_usage
- self.compact_messages_if_needed()
- return TurnResult(
- prompt=prompt,
- output=output,
- matched_commands=matched_commands,
- matched_tools=matched_tools,
- permission_denials=denied_tools,
- usage=self.total_usage,
- stop_reason=stop_reason,
- )
- def stream_submit_message(
- self,
- prompt: str,
- matched_commands: tuple[str, ...] = (),
- matched_tools: tuple[str, ...] = (),
- denied_tools: tuple[PermissionDenial, ...] = (),
- ):
- yield {'type': 'message_start', 'session_id': self.session_id, 'prompt': prompt}
- if matched_commands:
- yield {'type': 'command_match', 'commands': matched_commands}
- if matched_tools:
- yield {'type': 'tool_match', 'tools': matched_tools}
- if denied_tools:
- yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]}
- result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools)
- yield {'type': 'message_delta', 'text': result.output}
- yield {
- 'type': 'message_stop',
- 'usage': {'input_tokens': result.usage.input_tokens, 'output_tokens': result.usage.output_tokens},
- 'stop_reason': result.stop_reason,
- 'transcript_size': len(self.transcript_store.entries),
- }
- def compact_messages_if_needed(self) -> None:
- if len(self.mutable_messages) > self.config.compact_after_turns:
- self.mutable_messages[:] = self.mutable_messages[-self.config.compact_after_turns :]
- self.transcript_store.compact(self.config.compact_after_turns)
- def replay_user_messages(self) -> tuple[str, ...]:
- return self.transcript_store.replay()
- def flush_transcript(self) -> None:
- self.transcript_store.flush()
- def persist_session(self) -> str:
- self.flush_transcript()
- path = save_session(
- StoredSession(
- session_id=self.session_id,
- messages=tuple(self.mutable_messages),
- input_tokens=self.total_usage.input_tokens,
- output_tokens=self.total_usage.output_tokens,
- )
- )
- return str(path)
- def _format_output(self, summary_lines: list[str]) -> str:
- if self.config.structured_output:
- payload = {
- 'summary': summary_lines,
- 'session_id': self.session_id,
- }
- return self._render_structured_output(payload)
- return '\n'.join(summary_lines)
- def _render_structured_output(self, payload: dict[str, object]) -> str:
- last_error: Exception | None = None
- for _ in range(self.config.structured_retry_limit):
- try:
- return json.dumps(payload, indent=2)
- except (TypeError, ValueError) as exc: # pragma: no cover - defensive branch
- last_error = exc
- payload = {'summary': ['structured output retry'], 'session_id': self.session_id}
- raise RuntimeError('structured output rendering failed') from last_error
- def render_summary(self) -> str:
- command_backlog = build_command_backlog()
- tool_backlog = build_tool_backlog()
- sections = [
- '# Python Porting Workspace Summary',
- '',
- self.manifest.to_markdown(),
- '',
- f'Command surface: {len(command_backlog.modules)} mirrored entries',
- *command_backlog.summary_lines()[:10],
- '',
- f'Tool surface: {len(tool_backlog.modules)} mirrored entries',
- *tool_backlog.summary_lines()[:10],
- '',
- f'Session id: {self.session_id}',
- f'Conversation turns stored: {len(self.mutable_messages)}',
- f'Permission denials tracked: {len(self.permission_denials)}',
- f'Usage totals: in={self.total_usage.input_tokens} out={self.total_usage.output_tokens}',
- f'Max turns: {self.config.max_turns}',
- f'Max budget tokens: {self.config.max_budget_tokens}',
- f'Transcript flushed: {self.transcript_store.flushed}',
- ]
- return '\n'.join(sections)
|