| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- from __future__ import annotations
- import json
- from dataclasses import dataclass
- from functools import lru_cache
- from pathlib import Path
- from .models import PortingBacklog, PortingModule
- from .permissions import ToolPermissionContext
- SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'tools_snapshot.json'
- @dataclass(frozen=True)
- class ToolExecution:
- name: str
- source_hint: str
- payload: str
- handled: bool
- message: str
- @lru_cache(maxsize=1)
- def load_tool_snapshot() -> tuple[PortingModule, ...]:
- raw_entries = json.loads(SNAPSHOT_PATH.read_text())
- return tuple(
- PortingModule(
- name=entry['name'],
- responsibility=entry['responsibility'],
- source_hint=entry['source_hint'],
- status='mirrored',
- )
- for entry in raw_entries
- )
- PORTED_TOOLS = load_tool_snapshot()
- def build_tool_backlog() -> PortingBacklog:
- return PortingBacklog(title='Tool surface', modules=list(PORTED_TOOLS))
- def tool_names() -> list[str]:
- return [module.name for module in PORTED_TOOLS]
- def get_tool(name: str) -> PortingModule | None:
- needle = name.lower()
- for module in PORTED_TOOLS:
- if module.name.lower() == needle:
- return module
- return None
- def filter_tools_by_permission_context(tools: tuple[PortingModule, ...], permission_context: ToolPermissionContext | None = None) -> tuple[PortingModule, ...]:
- if permission_context is None:
- return tools
- return tuple(module for module in tools if not permission_context.blocks(module.name))
- def get_tools(
- simple_mode: bool = False,
- include_mcp: bool = True,
- permission_context: ToolPermissionContext | None = None,
- ) -> tuple[PortingModule, ...]:
- tools = list(PORTED_TOOLS)
- if simple_mode:
- tools = [module for module in tools if module.name in {'BashTool', 'FileReadTool', 'FileEditTool'}]
- if not include_mcp:
- tools = [module for module in tools if 'mcp' not in module.name.lower() and 'mcp' not in module.source_hint.lower()]
- return filter_tools_by_permission_context(tuple(tools), permission_context)
- def find_tools(query: str, limit: int = 20) -> list[PortingModule]:
- needle = query.lower()
- matches = [module for module in PORTED_TOOLS if needle in module.name.lower() or needle in module.source_hint.lower()]
- return matches[:limit]
- def execute_tool(name: str, payload: str = '') -> ToolExecution:
- module = get_tool(name)
- if module is None:
- return ToolExecution(name=name, source_hint='', payload=payload, handled=False, message=f'Unknown mirrored tool: {name}')
- action = f"Mirrored tool '{module.name}' from {module.source_hint} would handle payload {payload!r}."
- return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=True, message=action)
- def render_tool_index(limit: int = 20, query: str | None = None) -> str:
- modules = find_tools(query, limit) if query else list(PORTED_TOOLS[:limit])
- lines = [f'Tool entries: {len(PORTED_TOOLS)}', '']
- if query:
- lines.append(f'Filtered by: {query}')
- lines.append('')
- lines.extend(f'- {module.name} — {module.source_hint}' for module in modules)
- return '\n'.join(lines)
|