| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- from __future__ import annotations
- from dataclasses import dataclass
- from .commands import PORTED_COMMANDS
- from .context import PortContext, build_port_context, render_context
- from .history import HistoryLog
- from .models import PermissionDenial, PortingModule
- from .query_engine import QueryEngineConfig, QueryEnginePort, TurnResult
- from .setup import SetupReport, WorkspaceSetup, run_setup
- from .system_init import build_system_init_message
- from .tools import PORTED_TOOLS
- from .execution_registry import build_execution_registry
- @dataclass(frozen=True)
- class RoutedMatch:
- kind: str
- name: str
- source_hint: str
- score: int
- @dataclass
- class RuntimeSession:
- prompt: str
- context: PortContext
- setup: WorkspaceSetup
- setup_report: SetupReport
- system_init_message: str
- history: HistoryLog
- routed_matches: list[RoutedMatch]
- turn_result: TurnResult
- command_execution_messages: tuple[str, ...]
- tool_execution_messages: tuple[str, ...]
- stream_events: tuple[dict[str, object], ...]
- persisted_session_path: str
- def as_markdown(self) -> str:
- lines = [
- '# Runtime Session',
- '',
- f'Prompt: {self.prompt}',
- '',
- '## Context',
- render_context(self.context),
- '',
- '## Setup',
- f'- Python: {self.setup.python_version} ({self.setup.implementation})',
- f'- Platform: {self.setup.platform_name}',
- f'- Test command: {self.setup.test_command}',
- '',
- '## Startup Steps',
- *(f'- {step}' for step in self.setup.startup_steps()),
- '',
- '## System Init',
- self.system_init_message,
- '',
- '## Routed Matches',
- ]
- if self.routed_matches:
- lines.extend(
- f'- [{match.kind}] {match.name} ({match.score}) — {match.source_hint}'
- for match in self.routed_matches
- )
- else:
- lines.append('- none')
- lines.extend([
- '',
- '## Command Execution',
- *(self.command_execution_messages or ('none',)),
- '',
- '## Tool Execution',
- *(self.tool_execution_messages or ('none',)),
- '',
- '## Stream Events',
- *(f"- {event['type']}: {event}" for event in self.stream_events),
- '',
- '## Turn Result',
- self.turn_result.output,
- '',
- f'Persisted session path: {self.persisted_session_path}',
- '',
- self.history.as_markdown(),
- ])
- return '\n'.join(lines)
- class PortRuntime:
- def route_prompt(self, prompt: str, limit: int = 5) -> list[RoutedMatch]:
- tokens = {token.lower() for token in prompt.replace('/', ' ').replace('-', ' ').split() if token}
- by_kind = {
- 'command': self._collect_matches(tokens, PORTED_COMMANDS, 'command'),
- 'tool': self._collect_matches(tokens, PORTED_TOOLS, 'tool'),
- }
- selected: list[RoutedMatch] = []
- for kind in ('command', 'tool'):
- if by_kind[kind]:
- selected.append(by_kind[kind].pop(0))
- leftovers = sorted(
- [match for matches in by_kind.values() for match in matches],
- key=lambda item: (-item.score, item.kind, item.name),
- )
- selected.extend(leftovers[: max(0, limit - len(selected))])
- return selected[:limit]
- def bootstrap_session(self, prompt: str, limit: int = 5) -> RuntimeSession:
- context = build_port_context()
- setup_report = run_setup(trusted=True)
- setup = setup_report.setup
- history = HistoryLog()
- engine = QueryEnginePort.from_workspace()
- history.add('context', f'python_files={context.python_file_count}, archive_available={context.archive_available}')
- history.add('registry', f'commands={len(PORTED_COMMANDS)}, tools={len(PORTED_TOOLS)}')
- matches = self.route_prompt(prompt, limit=limit)
- registry = build_execution_registry()
- command_execs = tuple(registry.command(match.name).execute(prompt) for match in matches if match.kind == 'command' and registry.command(match.name))
- tool_execs = tuple(registry.tool(match.name).execute(prompt) for match in matches if match.kind == 'tool' and registry.tool(match.name))
- denials = tuple(self._infer_permission_denials(matches))
- stream_events = tuple(engine.stream_submit_message(
- prompt,
- matched_commands=tuple(match.name for match in matches if match.kind == 'command'),
- matched_tools=tuple(match.name for match in matches if match.kind == 'tool'),
- denied_tools=denials,
- ))
- turn_result = engine.submit_message(
- prompt,
- matched_commands=tuple(match.name for match in matches if match.kind == 'command'),
- matched_tools=tuple(match.name for match in matches if match.kind == 'tool'),
- denied_tools=denials,
- )
- persisted_session_path = engine.persist_session()
- history.add('routing', f'matches={len(matches)} for prompt={prompt!r}')
- history.add('execution', f'command_execs={len(command_execs)} tool_execs={len(tool_execs)}')
- history.add('turn', f'commands={len(turn_result.matched_commands)} tools={len(turn_result.matched_tools)} denials={len(turn_result.permission_denials)} stop={turn_result.stop_reason}')
- history.add('session_store', persisted_session_path)
- return RuntimeSession(
- prompt=prompt,
- context=context,
- setup=setup,
- setup_report=setup_report,
- system_init_message=build_system_init_message(trusted=True),
- history=history,
- routed_matches=matches,
- turn_result=turn_result,
- command_execution_messages=command_execs,
- tool_execution_messages=tool_execs,
- stream_events=stream_events,
- persisted_session_path=persisted_session_path,
- )
- def run_turn_loop(self, prompt: str, limit: int = 5, max_turns: int = 3, structured_output: bool = False) -> list[TurnResult]:
- engine = QueryEnginePort.from_workspace()
- engine.config = QueryEngineConfig(max_turns=max_turns, structured_output=structured_output)
- matches = self.route_prompt(prompt, limit=limit)
- command_names = tuple(match.name for match in matches if match.kind == 'command')
- tool_names = tuple(match.name for match in matches if match.kind == 'tool')
- results: list[TurnResult] = []
- for turn in range(max_turns):
- turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]'
- result = engine.submit_message(turn_prompt, command_names, tool_names, ())
- results.append(result)
- if result.stop_reason != 'completed':
- break
- return results
- def _infer_permission_denials(self, matches: list[RoutedMatch]) -> list[PermissionDenial]:
- denials: list[PermissionDenial] = []
- for match in matches:
- if match.kind == 'tool' and 'bash' in match.name.lower():
- denials.append(PermissionDenial(tool_name=match.name, reason='destructive shell execution remains gated in the Python port'))
- return denials
- def _collect_matches(self, tokens: set[str], modules: tuple[PortingModule, ...], kind: str) -> list[RoutedMatch]:
- matches: list[RoutedMatch] = []
- for module in modules:
- score = self._score(tokens, module)
- if score > 0:
- matches.append(RoutedMatch(kind=kind, name=module.name, source_hint=module.source_hint, score=score))
- matches.sort(key=lambda item: (-item.score, item.name))
- return matches
- @staticmethod
- def _score(tokens: set[str], module: PortingModule) -> int:
- haystacks = [module.name.lower(), module.source_hint.lower(), module.responsibility.lower()]
- score = 0
- for token in tokens:
- if any(token in haystack for haystack in haystacks):
- score += 1
- return score
|