tools.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. from __future__ import annotations
  2. import json
  3. from dataclasses import dataclass
  4. from functools import lru_cache
  5. from pathlib import Path
  6. from .models import PortingBacklog, PortingModule
  7. from .permissions import ToolPermissionContext
  8. SNAPSHOT_PATH = Path(__file__).resolve().parent / 'reference_data' / 'tools_snapshot.json'
  9. @dataclass(frozen=True)
  10. class ToolExecution:
  11. name: str
  12. source_hint: str
  13. payload: str
  14. handled: bool
  15. message: str
  16. @lru_cache(maxsize=1)
  17. def load_tool_snapshot() -> tuple[PortingModule, ...]:
  18. raw_entries = json.loads(SNAPSHOT_PATH.read_text())
  19. return tuple(
  20. PortingModule(
  21. name=entry['name'],
  22. responsibility=entry['responsibility'],
  23. source_hint=entry['source_hint'],
  24. status='mirrored',
  25. )
  26. for entry in raw_entries
  27. )
  28. PORTED_TOOLS = load_tool_snapshot()
  29. def build_tool_backlog() -> PortingBacklog:
  30. return PortingBacklog(title='Tool surface', modules=list(PORTED_TOOLS))
  31. def tool_names() -> list[str]:
  32. return [module.name for module in PORTED_TOOLS]
  33. def get_tool(name: str) -> PortingModule | None:
  34. needle = name.lower()
  35. for module in PORTED_TOOLS:
  36. if module.name.lower() == needle:
  37. return module
  38. return None
  39. def filter_tools_by_permission_context(tools: tuple[PortingModule, ...], permission_context: ToolPermissionContext | None = None) -> tuple[PortingModule, ...]:
  40. if permission_context is None:
  41. return tools
  42. return tuple(module for module in tools if not permission_context.blocks(module.name))
  43. def get_tools(
  44. simple_mode: bool = False,
  45. include_mcp: bool = True,
  46. permission_context: ToolPermissionContext | None = None,
  47. ) -> tuple[PortingModule, ...]:
  48. tools = list(PORTED_TOOLS)
  49. if simple_mode:
  50. tools = [module for module in tools if module.name in {'BashTool', 'FileReadTool', 'FileEditTool'}]
  51. if not include_mcp:
  52. tools = [module for module in tools if 'mcp' not in module.name.lower() and 'mcp' not in module.source_hint.lower()]
  53. return filter_tools_by_permission_context(tuple(tools), permission_context)
  54. def find_tools(query: str, limit: int = 20) -> list[PortingModule]:
  55. needle = query.lower()
  56. matches = [module for module in PORTED_TOOLS if needle in module.name.lower() or needle in module.source_hint.lower()]
  57. return matches[:limit]
  58. def execute_tool(name: str, payload: str = '') -> ToolExecution:
  59. module = get_tool(name)
  60. if module is None:
  61. return ToolExecution(name=name, source_hint='', payload=payload, handled=False, message=f'Unknown mirrored tool: {name}')
  62. action = f"Mirrored tool '{module.name}' from {module.source_hint} would handle payload {payload!r}."
  63. return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=True, message=action)
  64. def render_tool_index(limit: int = 20, query: str | None = None) -> str:
  65. modules = find_tools(query, limit) if query else list(PORTED_TOOLS[:limit])
  66. lines = [f'Tool entries: {len(PORTED_TOOLS)}', '']
  67. if query:
  68. lines.append(f'Filtered by: {query}')
  69. lines.append('')
  70. lines.extend(f'- {module.name} — {module.source_hint}' for module in modules)
  71. return '\n'.join(lines)