diff --git a/db/migrations/043_integration_provider_is_implemented.sql b/db/migrations/043_integration_provider_is_implemented.sql new file mode 100644 index 00000000..ff324b3a --- /dev/null +++ b/db/migrations/043_integration_provider_is_implemented.sql @@ -0,0 +1,12 @@ +-- Migration 043: add is_implemented flag to integration_providers +-- Only providers with is_implemented = true AND an active instance +-- are surfaced in the AI provider registry. Catalog entries with +-- is_implemented = false remain discoverable via list_integrations tool +-- but are NOT injected into the system prompt. + +ALTER TABLE integration_providers + ADD COLUMN IF NOT EXISTS is_implemented BOOLEAN NOT NULL DEFAULT FALSE; + +-- Nebula itself is the only implemented provider at bootstrap. +-- External provider rows (slack, github, etc.) stay FALSE until +-- their tool code is merged and this flag is flipped. diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index 39e1df3a..82b0f3fe 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -9,11 +9,9 @@ Endpoints: GET /chat/corpus — returns corpus_id of the nebula-system KB (or null) """ -import ast -import hashlib +import asyncio import json import os -import re import secrets import time from typing import Any, Dict, List, Optional @@ -35,406 +33,31 @@ router = APIRouter(prefix="/chat", tags=["chat"]) NEBULA_SYSTEM_CORPUS_SLUG = "nebula-system" NEBULA_SYSTEM_OWNER = "system" -# ── Nebula chat tool registry (MCP-like stacks) ─────────────────────────────── -# Organised into functional stacks. The LLM selects tools based on intent. -# Simple single-sentence or complex multi-paragraph intents are handled equally. -# -# Stack layout: -# agent_stack — create, run, list agents -# task_stack — create tasks directly from a goal description -# integration_stack — list, navigate, connect integrations -# navigation_stack — deep-link to any UI page +# ── Tool registry (data-driven) ────────────────────────────────────────────── +# Tools are defined in src/api/routers/chat_tools/. Adding a new tool requires +# creating one file there — no edits to this file. +# The registry auto-discovers tools on import. -NEBULA_CHAT_TOOLS: List[Dict[str, Any]] = [ - # ── agent_stack ──────────────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "create_agent", - "description": ( - "Create a new agent in NebulaOS. Use when the user wants to " - "create, build, or set up an agent, bot, skill, or assistant — " - "even if they describe it in a long multi-sentence paragraph. " - "Extract the name and purpose from whatever they say." - ), - "parameters": { - "type": "object", - "properties": { - "name": { - "type": "string", - "description": "Short slug-friendly agent name, e.g. 'gmail-digest'", - }, - "system_prompt": { - "type": "string", - "description": ( - "Full agent system prompt capturing purpose, " - "data sources, output format, and any integrations to use." - ), - }, - "model": { - "type": "string", - "description": "Model ID (leave empty for platform default)", - }, - }, - "required": ["name", "system_prompt"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "run_agent", - "description": ( - "Run a task on an existing agent by its ID. Use when the user " - "wants to execute, run, or dispatch a task to a named agent. " - "Use list_agents first if unsure which agent to target." - ), - "parameters": { - "type": "object", - "properties": { - "agent_id": { - "type": "string", - "description": "Agent ID from the SYSTEM STATE agent list", - }, - "goal": { - "type": "string", - "description": "Full task goal or instruction", - }, - }, - "required": ["agent_id", "goal"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "list_agents", - "description": ( - "List all agents currently in the system with their names, IDs, " - "and states. Use before run_agent to find the right agent." - ), - "parameters": {"type": "object", "properties": {}, "required": []}, - }, - }, - # ── integration_stack ───────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "list_integrations", - "description": ( - "List all integration providers (Slack, GitHub, Jira, Telegram, " - "Notion, Discord, Trello, etc.) available in NebulaOS, optionally " - "filtered by category or search term. Also returns which ones the " - "user has already connected (instances)." - ), - "parameters": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Optional filter: service name or category", - }, - }, - "required": [], - }, - }, - }, - { - "type": "function", - "function": { - "name": "search_plugins", - "description": ( - "Search installed plugins by name or capability. Plugins are " - "tool stacks — each plugin exposes a set of actions (like an " - "MCP server). Use this when asking about installed tool stacks." - ), - "parameters": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Plugin name or capability keyword", - }, - }, - "required": ["query"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "connect_integration", - "description": ( - "Open the integration setup page for a specific provider, so the " - "user can connect it. Use when user says 'connect my Slack', " - "'integrate Telegram', 'set up GitHub', or mentions wanting to " - "link a specific external service." - ), - "parameters": { - "type": "object", - "properties": { - "provider_slug": { - "type": "string", - "description": ( - "Slug of the integration provider: slack, github, jira, " - "linear, notion, telegram, whatsapp, pagerduty, ado" - ), - }, - }, - "required": ["provider_slug"], - }, - }, - }, - # ── demo_stack ───────────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "try_it_out", - "description": ( - "Run a live demo task using the Nebula demo agent to showcase " - "multi-step agent orchestration. Use when the user says 'try it', " - "'show me', 'demo', 'tryit', or asks to see Nebula in action. " - "Dispatches a real agent task and returns the task ID for live streaming." - ), - "parameters": { - "type": "object", - "properties": { - "template_id": { - "type": "string", - "enum": [ - "runtime_audit", - "dag_planner", - "mini_benchmark", - "self_portrait", - "multi_researcher", - ], - "description": ( - "Which demo template to run. Defaults to runtime_audit " - "if the user hasn't specified a preference." - ), - }, - "topic": { - "type": "string", - "description": ( - "Optional topic or subject for templates that need one " - "(e.g. multi_researcher, dag_planner)." - ), - }, - }, - "required": ["template_id"], - }, - }, - }, - # ── memory_stack ───────────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "delete_memory", - "description": ( - "Soft-delete one or more memories from an agent's memory bank. " - "Always run with dry_run=true first so the user can confirm " - "before the actual deletion. Use when the user says 'forget', " - "'delete memory', 'remove what you know about', or explicitly " - "asks to clear specific stored facts." - ), - "parameters": { - "type": "object", - "properties": { - "agent_id": { - "type": "string", - "description": ( - "ID of the agent whose memories to delete. " - "Use list_agents to find the right agent." - ), - }, - "memory_ids": { - "type": "array", - "items": {"type": "string"}, - "description": ( - "Optional list of specific memory IDs to delete. " - "When omitted, filter by tags instead." - ), - }, - "tags": { - "type": "array", - "items": {"type": "string"}, - "description": ( - "Delete all memories that contain ANY of these tags. " - "Ignored when memory_ids is supplied." - ), - }, - "dry_run": { - "type": "boolean", - "description": ( - "When true (default), return matching memories " - "without deleting. Set false only after the user " - "has confirmed the preview." - ), - }, - }, - "required": ["agent_id"], - }, - }, - }, - # ── plugin_stack ────────────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "create_plugin", - "description": ( - "Generate and save a new plugin (runnable Python tool) in NebulaOS. " - "Use when the user asks to build, create, write, or code a plugin, " - "tool, capability, or automation script for agents. " - "ALWAYS call with confirm=false first to show the draft for review. " - "Only re-call with confirm=true when the user explicitly approves." - ), - "parameters": { - "type": "object", - "properties": { - "name": { - "type": "string", - "description": "Slug-safe plugin name, lowercase, hyphens only. e.g. 'fetch-github-prs'", - }, - "version": { - "type": "string", - "description": "Semantic version, e.g. '1.0.0'", - }, - "description": { - "type": "string", - "description": "One-sentence description of what the plugin does.", - }, - "tags": { - "type": "array", - "items": {"type": "string"}, - "description": "2-5 lowercase tags describing the plugin, e.g. ['github', 'prs', 'integration']", - }, - "license": { - "type": "string", - "enum": ["MIT", "Apache-2.0", "GPL-3.0", "BSD-2-Clause", "Proprietary"], - "description": "License for the plugin source code.", - }, - "scopes": { - "type": "array", - "items": { - "type": "string", - "enum": [ - "nebula:agents:read", "nebula:agents:write", - "nebula:tasks:read", "nebula:tasks:write", - "nebula:plugins:read", "nebula:memory:read", - "nebula:memory:write", "nebula:network:outbound", - "nebula:files:read", "nebula:files:write", - "nebula:db:read", - ], - }, - "description": "Permissions this plugin needs — pick only what is strictly required.", - }, - "code": { - "type": "string", - "description": ( - "Complete Python source code. MUST define: " - "def run(params: dict) -> dict" - ), - }, - "confirm": { - "type": "boolean", - "description": ( - "Set true ONLY when the user has explicitly approved the draft " - "(said 'save it', 'looks good', 'confirm', or similar). " - "Default is false — always show the draft first." - ), - }, - }, - "required": ["name", "version", "description", "tags", "license", "scopes", "code"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "publish_plugin", - "description": ( - "Publish an existing plugin to the Nebula marketplace so other users " - "can discover and install it. Use after a plugin has been saved. " - "ALWAYS call with confirm=false first to show a preview. " - "Only call with confirm=true when the user explicitly approves." - ), - "parameters": { - "type": "object", - "properties": { - "plugin_id": { - "type": "string", - "description": "The ID of the plugin to publish (e.g. plugin_abc123).", - }, - "confirm": { - "type": "boolean", - "description": ( - "Set true only after user explicitly confirms they want to publish. " - "Default is false — show preview first." - ), - }, - }, - "required": ["plugin_id"], - }, - }, - }, - # ── task_stack ──────────────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "list_tasks", - "description": ( - "List recent tasks in the system with their status and goal. " - "Use when user asks 'my tasks', 'recent tasks', 'what tasks are running', " - "'what did my agents do', or any question about task history." - ), - "parameters": { - "type": "object", - "properties": { - "status_filter": { - "type": "string", - "enum": ["running", "completed", "failed", "pending", "all"], - "description": "Filter by status. Defaults to 'all'.", - }, - "agent_id": { - "type": "string", - "description": "Optional: filter to tasks for a specific agent.", - }, - }, - "required": [], - }, - }, - }, - # ── navigation_stack ───────────────────────────────────────────────── - { - "type": "function", - "function": { - "name": "navigate_to", - "description": ( - "Navigate the user to a specific page in the NebulaOS UI. Use " - "when user asks to go to, open, show, or view a specific section: " - "agents, tasks, settings, integrations, plugins, knowledge, logs." - ), - "parameters": { - "type": "object", - "properties": { - "page": { - "type": "string", - "enum": [ - "agents", "tasks", "settings", "integrations", - "plugins", "knowledge", "logs", "models", "quota", - ], - "description": "Page to navigate to", - }, - "filter": { - "type": "string", - "description": "Optional search/filter to pre-fill on the page", - }, - }, - "required": ["page"], - }, - }, - }, -] +from src.api.routers.chat_tools import registry as _tool_registry # noqa: E402 +from src.api.routers.chat_tools import ToolContext # noqa: E402 + +# Single source of truth: tool definitions live in chat_tools/*.py (registry). +# NEBULA_CHAT_TOOLS() returns the live list — kept as a callable alias for +# external callers (e.g. bridge mode, tests). +NEBULA_CHAT_TOOLS = _tool_registry.get_definitions + +# Maximum number of tools (across all providers combined) that are inlined +# into the system prompt as compact notation: {slug:{t1,t2,...}}. +# Once exceeded, only counts are sent: {slug:N} and the LLM must call +# list_provider_tools(slug) to get the names before invoking any tool. +# Compact set notation is ~3x more token-efficient than verbose lines, +# so this threshold is set high — adjust if token budget becomes a concern. +_TOOLS_INLINE_LIMIT: int = 80 + + +def _get_tools() -> List[Dict[str, Any]]: + """Return live tool definitions from registry (may include dynamic plugin tools).""" + return _tool_registry.get_definitions() def _cid() -> str: @@ -506,6 +129,7 @@ class ToolAction(BaseModel): class ChatResponse(BaseModel): reply: str + session_id: Optional[str] = None corpus_id: Optional[str] chunks_used: int model_used: str @@ -515,6 +139,7 @@ class ChatResponse(BaseModel): # Bridge mode: when set, frontend should call local bridge with these messages bridge_required: bool = Field(default=False) bridge_messages: Optional[List[Dict[str, Any]]] = Field(default=None) + bridge_tools: Optional[List[Dict[str, Any]]] = Field(default=None) # ── Message persistence ─────────────────────────────────────────────────────── @@ -577,103 +202,172 @@ def _get_rag_provider(request: Request): def _get_model_provider(request: Request): - """Platform LLM first, fall back to offered AI.""" + """Resolution chain: Platform AI → Offered AI (Bridge) → GitHub Models.""" provider = getattr(request.app.state, "model_provider", None) if provider is not None: return provider - return getattr(request.app.state, "offered_model_provider", None) + offered = getattr(request.app.state, "offered_model_provider", None) + if offered is not None: + return offered + return getattr(request.app.state, "github_model_provider", None) -async def _get_system_state(db) -> Dict[str, Any]: - """Fetch live system state for grounding the LLM (anti-hallucination).""" - state: Dict[str, Any] = { - "agents": [], "models": [], "plugins": [], - "integrations": [], "tools": [], +def _get_default_model(request: Request) -> str: + """Return the appropriate default model for the active provider stream.""" + if getattr(request.app.state, "model_provider", None) is not None: + return os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") + if getattr(request.app.state, "offered_model_provider", None) is not None: + return os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") + if getattr(request.app.state, "github_model_provider", None) is not None: + return os.getenv("NEBULA_GITHUB_MODELS_DEFAULT_MODEL", "gpt-4.1") + return os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") + + +async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, Any]: + """Fetch live system state for grounding the LLM (anti-hallucination). + + Returns two top-level registries: + provider_registry — Nebula always first; external providers only when + they have an active instance AND is_implemented=true. + data_registry — entity data scoped to the requesting user. + + All DB queries run in parallel. Each is independently wrapped so a + single failure does not affect the others. + + Security: every query that touches user-owned entities filters by + user_email. integration_instances has no owner column yet — marked + TODO for migration 044. + """ + + async def _fetch_agents(): + try: + if user_email: + rows = await db.fetch( + "SELECT id, name, status, model FROM agents " + "WHERE user_email = $1 ORDER BY created_at DESC LIMIT 20", + user_email, + ) + else: + rows = await db.fetch( + "SELECT id, name, status, model " + "FROM agents ORDER BY created_at DESC LIMIT 20" + ) + return [ + {"id": r["id"], "name": r["name"], "state": r["status"], "model": r["model"]} + for r in rows + ] + except Exception: + return [] + + async def _fetch_models(): + try: + rows = await db.fetch( + "SELECT model_id, display_name, provider_id " + "FROM registered_models WHERE is_active = true LIMIT 20" + ) + return [ + {"id": r["model_id"], "name": r["display_name"], "provider": r["provider_id"]} + for r in rows + ] + except Exception: + return [] + + async def _fetch_plugins(): + """Show: user's own plugins (any status) + published plugins (public). + Drafts belonging to other users are explicitly excluded.""" + try: + if user_email: + rows = await db.fetch( + "SELECT id, name, status FROM plugins " + "WHERE status = 'published' OR user_email = $1 " + "ORDER BY created_at DESC LIMIT 20", + user_email, + ) + else: + rows = await db.fetch( + "SELECT id, name, status FROM plugins " + "WHERE status IN ('installed', 'published') LIMIT 20" + ) + return [{"id": r["id"], "name": r["name"]} for r in rows] + except Exception: + return [] + + async def _fetch_ext_providers(): + """Returns external providers that are BOTH connected (active instance) + AND implemented (is_implemented=true on the provider row). + Unimplemented catalog entries are intentionally excluded — they are + noise until real tool code exists behind them.""" + try: + rows = await db.fetch( + """ + SELECT DISTINCT ip.slug, ip.name, ip.category, ip.capabilities + FROM integration_instances ii + JOIN integration_providers ip ON ip.id = ii.provider_id + WHERE ii.status = 'active' AND ip.is_implemented = true + ORDER BY ip.name LIMIT 20 + """ + ) + return [ + { + "slug": r["slug"], + "name": r["name"], + "tools": list(r["capabilities"] or []), + } + for r in rows + ] + except Exception: + return [] + + async def _fetch_tasks(): + try: + if user_email: + rows = await db.fetch( + "SELECT id, description, status, agent_id FROM tasks " + "WHERE user_email = $1 ORDER BY created_at DESC LIMIT 10", + user_email, + ) + else: + rows = await db.fetch( + "SELECT id, description, status, agent_id " + "FROM tasks ORDER BY created_at DESC LIMIT 10" + ) + return [ + { + "id": r["id"], + "description": (r["description"] or "")[:80], + "status": r["status"], + "agent_id": r["agent_id"], + } + for r in rows + ] + except Exception: + return [] + + agents, models, plugins, ext_providers, tasks = await asyncio.gather( + _fetch_agents(), + _fetch_models(), + _fetch_plugins(), + _fetch_ext_providers(), + _fetch_tasks(), + ) + + # Nebula is always provider[0] — its tools come from the live registry. + nebula_tools = [d["function"]["name"] for d in _tool_registry.get_definitions()] + provider_registry = [ + {"slug": "nebula", "name": "NebulaOS", "tools": nebula_tools} + ] + ext_providers + + data_registry = { + "agents": agents, + "models": models, + "plugins": plugins, + "tasks": tasks, + } + + return { + "provider_registry": provider_registry, + "data_registry": data_registry, } - try: - rows = await db.fetch( - "SELECT id, name, status, model " - "FROM agents ORDER BY created_at DESC LIMIT 20" - ) - state["agents"] = [ - { - "id": r["id"], "name": r["name"], - "state": r["status"], "model": r["model"], - } - for r in rows - ] - except Exception: - pass - try: - rows = await db.fetch( - "SELECT model_id, display_name, provider_id " - "FROM registered_models WHERE is_active = true LIMIT 20" - ) - state["models"] = [ - { - "id": r["model_id"], - "name": r["display_name"], - "provider": r["provider_id"], - } - for r in rows - ] - except Exception: - pass - try: - rows = await db.fetch( - "SELECT id, name, status " - "FROM plugins WHERE status = 'installed' LIMIT 20" - ) - state["plugins"] = [ - {"id": r["id"], "name": r["name"]} - for r in rows - ] - except Exception: - pass - try: - # Live integration instances (connected stacks) - rows = await db.fetch( - """ - SELECT ii.id, ii.name, ip.name AS provider_name, ip.slug, - ip.category, ip.capabilities, ii.is_active - FROM integration_instances ii - JOIN integration_providers ip ON ip.id = ii.provider_id - WHERE ii.is_active = true - ORDER BY ii.created_at DESC LIMIT 20 - """ - ) - state["integrations"] = [ - { - "id": r["id"], - "name": r["name"], - "provider": r["provider_name"], - "slug": r["slug"], - "category": r["category"], - "capabilities": list(r["capabilities"] or []), - } - for r in rows - ] - except Exception: - pass - try: - rows = await db.fetch( - "SELECT id, description, status, agent_id, created_at " - "FROM tasks ORDER BY created_at DESC LIMIT 10" - ) - state["tasks"] = [ - { - "id": r["id"], - "description": (r["description"] or "")[:80], - "status": r["status"], - "agent_id": r["agent_id"], - } - for r in rows - ] - except Exception: - pass - # Only list tools that are actually implemented - state["tools"] = ["read_file", "write_file", "http_request"] - return state async def _activate_agent_for_task( @@ -738,521 +432,6 @@ async def _activate_agent_for_task( }) -_PLUGIN_SCOPE_ALLOWLIST = frozenset([ - "nebula:agents:read", "nebula:agents:write", - "nebula:tasks:read", "nebula:tasks:write", - "nebula:plugins:read", "nebula:memory:read", - "nebula:memory:write", "nebula:network:outbound", - "nebula:files:read", "nebula:files:write", - "nebula:db:read", -]) - -_PLUGIN_LICENSE_ALLOWLIST = frozenset([ - "MIT", "Apache-2.0", "GPL-3.0", "BSD-2-Clause", "Proprietary", -]) - -_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9\-]{0,63}$") -_SEMVER_RE = re.compile(r"^\d+\.\d+\.\d+$") - - -def _validate_plugin_code(code: str) -> Optional[str]: - """AST-check that the code is valid Python and declares def run(params).""" - try: - tree = ast.parse(code) - except SyntaxError as exc: - return f"Syntax error in generated code: {exc}" - for node in ast.walk(tree): - if isinstance(node, ast.FunctionDef) and node.name == "run": - return None - return "Plugin code must define: def run(params: dict) -> dict" - - -_MAX_PLUGIN_NAME_LEN = 64 -_MAX_PLUGIN_DESC_LEN = 1000 -_MAX_PLUGIN_CODE_BYTES = 256 * 1024 # 256 KB -_MAX_PLUGIN_TAGS = 10 -_MAX_PLUGIN_SCOPES = 8 - - -async def _handle_create_plugin( - tool_args: Dict[str, Any], - db, - user_id: Optional[str], - cid: str, - bus=None, -) -> str: - """Handle the create_plugin tool — two-phase: draft then confirm.""" - name = tool_args.get("name", "").strip() - version = tool_args.get("version", "1.0.0").strip() - description = tool_args.get("description", "").strip() - tags = [t.strip().lower() for t in (tool_args.get("tags") or []) if t.strip()] - license_val = tool_args.get("license", "Proprietary") - scopes = list(tool_args.get("scopes") or []) - code = tool_args.get("code", "").strip() - confirm = bool(tool_args.get("confirm", False)) - - log.info("chat_create_plugin", { - "component": "api.chat", - "operation": "create_plugin", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": { - "name": name, "version": version, - "confirm": confirm, "scopes": scopes, - "tags": tags, "license": license_val, - }, - }) - - def _validation_error(field: str, msg: str) -> str: - log.warn("chat_create_plugin_validation_error", { - "component": "api.chat", - "operation": "create_plugin_validate", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": {"field": field, "error": msg, "name": name}, - }) - return json.dumps({"success": False, "error": msg}) - - # ── Validation ──────────────────────────────────────────────────────────── - if not name: - return _validation_error("name", "Plugin name is required.") - if len(name) > _MAX_PLUGIN_NAME_LEN: - return _validation_error( - "name", - f"Plugin name must be {_MAX_PLUGIN_NAME_LEN} characters or fewer." - ) - if not _SLUG_RE.match(name): - return _validation_error( - "name", - f"Plugin name '{name}' is invalid. Use lowercase letters, " - "digits, and hyphens only (e.g. 'fetch-github-prs').", - ) - if not _SEMVER_RE.match(version): - return _validation_error( - "version", - f"Version '{version}' must be semver format (e.g. '1.0.0').", - ) - if len(description) > _MAX_PLUGIN_DESC_LEN: - return _validation_error( - "description", - f"Description must be {_MAX_PLUGIN_DESC_LEN} characters or fewer.", - ) - if license_val not in _PLUGIN_LICENSE_ALLOWLIST: - return _validation_error( - "license", - f"License '{license_val}' is not allowed. " - f"Choose from: {', '.join(sorted(_PLUGIN_LICENSE_ALLOWLIST))}", - ) - if len(scopes) > _MAX_PLUGIN_SCOPES: - return _validation_error( - "scopes", - f"Too many scopes declared ({len(scopes)}). " - f"Maximum is {_MAX_PLUGIN_SCOPES}.", - ) - invalid_scopes = [s for s in scopes if s not in _PLUGIN_SCOPE_ALLOWLIST] - if invalid_scopes: - return _validation_error( - "scopes", - f"Unknown scopes: {invalid_scopes}. Use the predefined allowlist.", - ) - if len(tags) > _MAX_PLUGIN_TAGS: - return _validation_error( - "tags", - f"Too many tags ({len(tags)}). Maximum is {_MAX_PLUGIN_TAGS}.", - ) - code_bytes_raw = code.encode("utf-8") - if len(code_bytes_raw) > _MAX_PLUGIN_CODE_BYTES: - return _validation_error( - "code", - f"Plugin code exceeds maximum size of " - f"{_MAX_PLUGIN_CODE_BYTES // 1024} KB.", - ) - code_error = _validate_plugin_code(code) - if code_error: - return _validation_error("code", code_error) - - code_bytes = code_bytes_raw - checksum = hashlib.sha256(code_bytes).hexdigest() - code_size = len(code_bytes) - manifest = { - "name": name, - "version": version, - "author": user_id or "ai", - "description": description, - "permissions": scopes, - "tags": tags, - "license": license_val, - "entry_point": "run", - } - - if not confirm: - log.info("chat_create_plugin_draft", { - "component": "api.chat", - "operation": "create_plugin_draft", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": { - "name": name, "code_size": code_size, "checksum": checksum, - }, - }) - return json.dumps({ - "draft": True, - "success": None, - "plugin_id": None, - "name": name, - "version": version, - "description": description, - "tags": tags, - "plugin_license": license_val, - "scopes": scopes, - "code": code, - "checksum": checksum, - "code_size_bytes": code_size, - "manifest": manifest, - "message": ( - f"Plugin '{name}' draft ready. Review the code and scopes above. " - "Say 'save it' or 'looks good' to save this plugin." - ), - }) - - # ── Confirm path: persist to DB ─────────────────────────────────────────── - try: - existing = await db.fetchrow( - "SELECT id FROM plugins WHERE name = $1", name - ) - except Exception as exc: - log.error("chat_create_plugin_db_check_error", { - "component": "api.chat", - "operation": "create_plugin_check_exists", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": {"error": str(exc)}, - }) - return json.dumps({ - "success": False, - "error": "Database error while checking for duplicate plugin.", - }) - if existing: - log.warn("chat_create_plugin_duplicate", { - "component": "api.chat", - "operation": "create_plugin_check_exists", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": {"name": name, "existing_id": str(existing["id"])}, - }) - return json.dumps({ - "success": False, - "error": ( - f"A plugin named '{name}' already exists. " - "Choose a different name." - ), - }) - - plugin_id = f"plugin_{int(time.time() * 1000)}_{secrets.token_hex(4)}" - plugin_upload_dir = os.getenv("PLUGIN_UPLOAD_DIR", "/tmp/nebula_plugins") - - file_path: Optional[str] = None - try: - os.makedirs(plugin_upload_dir, exist_ok=True) - file_path = os.path.join(plugin_upload_dir, f"{plugin_id}.py") - with open(file_path, "w", encoding="utf-8") as fh: - fh.write(code) - except OSError as exc: - log.error("chat_create_plugin_file_error", { - "component": "api.chat", - "operation": "create_plugin_save_file", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"error": str(exc), "path": plugin_upload_dir}, - }) - - version_id = f"pv_{int(time.time() * 1000)}_{secrets.token_hex(3)}" - try: - async with db.transaction(): - await db.execute( - """ - INSERT INTO plugins - (id, name, version, author, description, manifest, - trust_level, status, scan_status, - tags, license, source_type, code, generated_by, - code_size_bytes, is_published, - created_at, updated_at) - VALUES ( - $1, $2, $3, $4, $5, $6, - 'experimental', 'installed', 'pending', - $7, $8, 'ai_generated', $9, $10, - $11, false, - NOW(), NOW() - ) - """, - plugin_id, - name, - version, - user_id or "ai", - description, - json.dumps(manifest), - tags, - license_val, - code, - user_id, - code_size, - ) - await db.execute( - """ - INSERT INTO plugin_versions - (id, plugin_id, version, file_path, checksum, - file_size, scan_status, is_current, - source_type, code, created_at) - VALUES ( - $1, $2, $3, $4, $5, $6, - 'pending', true, 'ai_generated', $7, NOW() - ) - """, - version_id, plugin_id, version, - file_path or "", checksum, code_size, code, - ) - except Exception as exc: - log.error("chat_create_plugin_db_error", { - "component": "api.chat", - "operation": "create_plugin_persist", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"error": str(exc), "name": name}, - }) - return json.dumps({ - "success": False, - "error": "Failed to persist plugin. The transaction was rolled back.", - }) - - log.info("chat_plugin_saved", { - "component": "api.chat", - "operation": "create_plugin_confirm", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": { - "name": name, "version": version, - "tags": tags, "license": license_val, - "code_size": code_size, "checksum": checksum, - "generated_by": user_id, - }, - }) - - if bus is not None: - try: - from src.events import PluginEvents - await _emit(bus, PluginEvents.created( - event_id=f"evt_{cid}_plugin", - plugin_id=plugin_id, - name=name, - source_type="ai_generated", - tags=tags, - user_id=user_id, - correlation_id=cid, - )) - except Exception as exc: - log.error("chat_create_plugin_event_error", { - "component": "api.chat", - "operation": "create_plugin_emit", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"error": str(exc)}, - }) - - return json.dumps({ - "draft": False, - "success": True, - "plugin_id": plugin_id, - "name": name, - "version": version, - "description": description, - "tags": tags, - "plugin_license": license_val, - "scopes": scopes, - "code_size_bytes": code_size, - "checksum": checksum, - "action_url": f"/plugins/{plugin_id}", - "message": ( - f"Plugin '{name}' v{version} has been saved with {code_size} bytes " - f"of code. It runs in the sandbox and is pending security scan. " - f"Would you like to publish it to the marketplace?" - ), - }) - - -async def _handle_publish_plugin( - tool_args: Dict[str, Any], - db, - user_id: Optional[str], - cid: str, - bus=None, -) -> str: - """Handle the publish_plugin tool — two-phase: preview then confirm.""" - plugin_id = tool_args.get("plugin_id", "").strip() - confirm = bool(tool_args.get("confirm", False)) - - if not plugin_id: - return json.dumps({"success": False, "error": "plugin_id is required."}) - - try: - row = await db.fetchrow( - "SELECT id, name, version, description, tags, license, " - "source_type, is_published " - "FROM plugins WHERE id = $1", - plugin_id, - ) - except Exception as exc: - log.error("chat_publish_plugin_db_lookup_error", { - "component": "api.chat", - "operation": "publish_plugin_lookup", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"error": str(exc)}, - }) - return json.dumps({ - "success": False, - "error": "Database error while looking up plugin.", - }) - if row is None: - log.warn("chat_publish_plugin_not_found", { - "component": "api.chat", - "operation": "publish_plugin_lookup", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"plugin_id": plugin_id}, - }) - return json.dumps({ - "success": False, - "error": f"Plugin '{plugin_id}' not found.", - }) - if row["is_published"]: - log.info("chat_publish_plugin_already_published", { - "component": "api.chat", - "operation": "publish_plugin_check", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"name": row["name"]}, - }) - return json.dumps({ - "success": False, - "message": f"Plugin '{row['name']}' is already published.", - }) - - log.info("chat_publish_plugin", { - "component": "api.chat", - "operation": "publish_plugin", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": { - "name": row["name"], "confirm": confirm, - }, - }) - - if not confirm: - return json.dumps({ - "draft": True, - "success": None, - "plugin_id": plugin_id, - "name": row["name"], - "version": row["version"], - "description": row["description"], - "tags": list(row["tags"] or []), - "plugin_license": row["license"], - "message": ( - f"Publish '{row['name']}' v{row['version']} to the marketplace? " - "It will be listed as community-trust. " - "Say 'yes publish it' to confirm." - ), - }) - - # ── Confirm publish ──────────────────────────────────────────────────────── - marketplace_id = f"mkt_{int(time.time() * 1000)}_{secrets.token_hex(4)}" - tags = list(row["tags"] or []) - try: - async with db.transaction(): - await db.execute( - """ - INSERT INTO marketplace_items - (id, item_type, name, description, author, - trust_level, tags, payload, source_entity_id, - version, is_active, published_at, created_at, updated_at) - VALUES ($1, 'plugin', $2, $3, $4, - 'community', $5, $6, $7, - $8, true, NOW(), NOW(), NOW()) - """, - marketplace_id, - row["name"], - row["description"] or "", - user_id or "ai", - tags, - json.dumps({"plugin_id": plugin_id, "license": row["license"]}), - plugin_id, - row["version"], - ) - await db.execute( - "UPDATE plugins " - "SET is_published = true, updated_at = NOW() " - "WHERE id = $1", - plugin_id, - ) - except Exception as exc: - log.error("chat_publish_plugin_db_error", { - "component": "api.chat", - "operation": "publish_plugin_persist", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": { - "error": str(exc), - "marketplace_id": marketplace_id, - "name": row["name"], - }, - }) - return json.dumps({ - "success": False, - "error": "Failed to publish plugin. The transaction was rolled back.", - }) - - log.info("chat_plugin_published", { - "component": "api.chat", - "operation": "publish_plugin_confirm", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": { - "marketplace_id": marketplace_id, - "name": row["name"], - }, - }) - - if bus is not None: - try: - from src.events import PluginEvents - await _emit(bus, PluginEvents.published( - event_id=f"evt_{cid}_publish", - plugin_id=plugin_id, - marketplace_item_id=marketplace_id, - correlation_id=cid, - )) - except Exception as exc: - log.error("chat_publish_plugin_event_error", { - "component": "api.chat", - "operation": "publish_plugin_emit", - "entity_id": plugin_id, - "correlation_id": cid, - "metadata": {"error": str(exc)}, - }) - - return json.dumps({ - "draft": False, - "success": True, - "plugin_id": plugin_id, - "marketplace_id": marketplace_id, - "action_url": f"/plugins/{plugin_id}", - "message": ( - f"Plugin '{row['name']}' is now published to the marketplace " - f"under a {row['license']} license." - ), - }) - - async def _execute_tool_call( tool_name: str, tool_args: Dict[str, Any], @@ -1262,483 +441,9 @@ async def _execute_tool_call( ctrl=None, bus=None, ) -> str: - """Execute a tool call from the LLM and return a result string.""" - log.info("chat_tool_call", { - "component": "api.chat", - "operation": "tool_call", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": {"tool": tool_name, "args": tool_args}, - }) - try: - if tool_name == "create_agent": - agent_id = f"agent_{int(time.time() * 1000)}_{secrets.token_hex(3)}" - name = tool_args.get("name", "nebula-agent") - system_prompt = tool_args.get("system_prompt", "") - model = tool_args.get("model") or os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") - await db.execute( - """ - INSERT INTO agents - (id, name, status, model, system_prompt, owner_id, - configuration, capabilities, created_at, updated_at) - VALUES ($1, $2, 'created', $3, $4, $5, '{}', '{}', - NOW(), NOW()) - ON CONFLICT DO NOTHING - """, - agent_id, name, model, system_prompt, user_id or "system", - ) - log.info("chat_agent_created", { - "component": "api.chat", "operation": "create_agent", - "entity_id": agent_id, "correlation_id": cid, - "metadata": {"name": name, "model": model}, - }) - return json.dumps({ - "success": True, "agent_id": agent_id, "name": name, - "message": f"Agent '{name}' created with ID {agent_id}.", - "action_url": f"/agents/{agent_id}", - }) - - if tool_name == "run_agent": - agent_id = tool_args.get("agent_id", "") - goal = tool_args.get("goal", "") - agent_row = await db.fetchrow( - "SELECT id, name FROM agents WHERE id = $1", agent_id - ) - if not agent_row: - return json.dumps({"success": False, "error": f"Agent {agent_id} not found."}) - task_id = f"task_{int(time.time() * 1000)}_{secrets.token_hex(3)}" - await db.execute( - """ - INSERT INTO tasks - (id, agent_id, description, task_type, status, - priority, input, max_retries, timeout_seconds, - created_by, created_at, updated_at) - VALUES ($1, $2, $3, 'workflow_step', 'pending', - 100, $4, 3, 300, $5, NOW(), NOW()) - ON CONFLICT DO NOTHING - """, - task_id, agent_id, goal, - json.dumps({"description": goal, "prompt": goal}), - user_id or "system", - ) - if ctrl is not None: - await _activate_agent_for_task( - ctrl, db, agent_id, task_id, goal, cid, - operation="run_agent", - ) - log.info("chat_task_created", { - "component": "api.chat", "operation": "run_agent", - "entity_id": task_id, "correlation_id": cid, - "metadata": {"agent_id": agent_id, "goal": goal}, - }) - return json.dumps({ - "success": True, "task_id": task_id, "agent_id": agent_id, - "message": f"Task dispatched to agent '{agent_row['name']}'. Task ID: {task_id}.", - "action_url": f"/tasks/{task_id}", - }) - - if tool_name == "list_agents": - rows = await db.fetch( - "SELECT id, name, status, model FROM agents ORDER BY created_at DESC LIMIT 20" - ) - agents = [{"id": r["id"], "name": r["name"], "state": r["status"]} for r in rows] - return json.dumps({"agents": agents, "count": len(agents)}) - - if tool_name == "search_plugins": - query = tool_args.get("query", "").lower() - rows = await db.fetch( - "SELECT id, name, status FROM plugins WHERE LOWER(name) LIKE $1 LIMIT 10", - f"%{query}%", - ) - plugins = [{"id": r["id"], "name": r["name"], "status": r["status"]} for r in rows] - return json.dumps({"plugins": plugins, "query": query}) - - if tool_name == "list_tasks": - status_filter = tool_args.get("status_filter", "all") - agent_id_filter = tool_args.get("agent_id", "") - query = "SELECT id, description, status, agent_id, created_at FROM tasks" - params: list = [] - clauses = [] - if status_filter and status_filter != "all": - clauses.append(f"status = ${len(params) + 1}") - params.append(status_filter) - if agent_id_filter: - clauses.append(f"agent_id = ${len(params) + 1}") - params.append(agent_id_filter) - if clauses: - query += " WHERE " + " AND ".join(clauses) - query += " ORDER BY created_at DESC LIMIT 15" - rows = await db.fetch(query, *params) - tasks = [ - { - "id": r["id"], - "description": (r["description"] or "")[:100], - "status": r["status"], - "agent_id": r["agent_id"], - } - for r in rows - ] - return json.dumps({ - "tasks": tasks, - "count": len(tasks), - "action_url": "/tasks", - }) - - if tool_name == "navigate_to": - page = tool_args.get("page", "") - page_routes = { - "agents": "/agents", - "tasks": "/tasks", - "settings": "/settings", - "integrations": "/integrations", - "plugins": "/plugins", - "knowledge": "/knowledge", - "logs": "/logs", - "models": "/models", - "quota": "/quota", - } - route = page_routes.get(page, f"/{page}") - filt = tool_args.get("filter") - if filt: - route = f"{route}?search={filt}" - log.info("chat_navigate", { - "component": "api.chat", "operation": "navigate_to", - "entity_id": user_id or "anon", "correlation_id": cid, - "metadata": {"page": page, "filter": filt}, - }) - return json.dumps({ - "success": True, - "message": f"Opening {page} page.", - "action_url": route, - }) - - if tool_name == "list_integrations": - query = tool_args.get("query", "").lower() - q_filter = f"%{query}%" if query else "%" - rows = await db.fetch( - """ - SELECT ip.id, ip.name, ip.slug, ip.category, ip.capabilities, - COUNT(ii.id) AS connected_count - FROM integration_providers ip - LEFT JOIN integration_instances ii - ON ii.provider_id = ip.id AND ii.is_active = true - WHERE (LOWER(ip.name) LIKE $1 OR LOWER(ip.category) LIKE $1) - GROUP BY ip.id, ip.name, ip.slug, ip.category, ip.capabilities - ORDER BY ip.name - LIMIT 20 - """, - q_filter, - ) - integrations = [ - { - "name": r["name"], - "slug": r["slug"], - "category": r["category"], - "capabilities": list(r["capabilities"] or []), - "connected": r["connected_count"] > 0, - } - for r in rows - ] - return json.dumps({ - "integrations": integrations, - "count": len(integrations), - "action_url": "/integrations", - }) - - if tool_name == "try_it_out": - template_id = tool_args.get("template_id", "runtime_audit") - topic = tool_args.get("topic", "") - _DEMO_TEMPLATES: Dict[str, Dict[str, str]] = { - "runtime_audit": { - "name": "Runtime Health Audit", - "goal": ( - "Inspect the current agent runtime: list all agents and their states, " - "check active tasks, identify any failures or anomalies, " - "and produce a structured health report with a summary and recommendations." - ), - }, - "dag_planner": { - "name": "Project DAG Planner", - "goal": ( - f"Decompose the following goal into a multi-step execution DAG: " - f"'{topic or 'build a production-ready data pipeline'}'. " - "For each node in the DAG assign: step name, estimated effort, " - "required tools, dependencies on other steps, and success criteria." - ), - }, - "mini_benchmark": { - "name": "Mini LLM Benchmark", - "goal": ( - "Run a mini benchmark: send 3 different test prompts to the configured model, " - "measure response latency for each, evaluate output quality on a 1-5 scale, " - "and produce an aggregate benchmark report with latency stats and quality scores." - ), - }, - "self_portrait": { - "name": "Platform Self-Portrait", - "goal": ( - "Introspect this NebulaOS instance: list all configured agents, " - "available tool categories, active integrations, and enforced policies. " - "Produce a structured capability card showing what this platform " - "can do right now — as if explaining it to a new user." - ), - }, - "multi_researcher": { - "name": "Multi-step Researcher", - "goal": ( - f"Research the topic: '{topic or 'the future of AI agent orchestration'}'. " - "Step 1: Break the topic into 3 research questions. " - "Step 2: For each question, gather key points from your knowledge. " - "Step 3: Synthesise into a structured report with an executive summary, " - "key findings per question, and cited reasoning." - ), - }, - } - template = _DEMO_TEMPLATES.get(template_id, _DEMO_TEMPLATES["runtime_audit"]) - template_name = template["name"] - goal = template["goal"] - - # Find or create the nebula-demo-agent - demo_row = await db.fetchrow( - "SELECT id FROM agents WHERE name = 'nebula-demo-agent' LIMIT 1" - ) - if demo_row: - demo_agent_id = demo_row["id"] - else: - demo_agent_id = f"agent_demo_{secrets.token_hex(6)}" - model = os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") - await db.execute( - """ - INSERT INTO agents - (id, name, status, model, system_prompt, owner_id, - configuration, capabilities, created_at, updated_at) - VALUES ($1, 'nebula-demo-agent', 'created', $2, - 'You are the Nebula demo agent. Execute tasks step-by-step, ' - 'showing your reasoning and tool calls clearly. ' - 'Always produce structured, well-formatted output.', - 'system', '{}', '{}', NOW(), NOW()) - ON CONFLICT DO NOTHING - """, - demo_agent_id, model, - ) - log.info("chat_demo_agent_created", { - "component": "api.chat", "operation": "try_it_out", - "entity_id": demo_agent_id, "correlation_id": cid, - "metadata": {"model": model}, - }) - - task_id = f"task_demo_{int(time.time() * 1000)}_{secrets.token_hex(3)}" - await db.execute( - """ - INSERT INTO tasks - (id, agent_id, description, task_type, status, - priority, input, max_retries, timeout_seconds, - created_by, created_at, updated_at) - VALUES ($1, $2, $3, 'workflow_step', 'pending', - 100, $4, 3, 300, 'system', NOW(), NOW()) - ON CONFLICT DO NOTHING - """, - task_id, - demo_agent_id, - goal, - json.dumps({"description": goal, "prompt": goal, "is_demo": True, "template_id": template_id}), - ) - if ctrl is not None: - await _activate_agent_for_task( - ctrl, db, demo_agent_id, task_id, goal, cid, - operation="try_it_out", - ) - log.info("chat_try_it_out", { - "component": "api.chat", "operation": "try_it_out", - "entity_id": task_id, "correlation_id": cid, - "metadata": { - "template_id": template_id, - "template_name": template_name, - "agent_id": demo_agent_id, - }, - }) - return json.dumps({ - "success": True, - "task_id": task_id, - "agent_id": demo_agent_id, - "template_id": template_id, - "template_name": template_name, - "message": ( - f"Demo task '{template_name}' is now running. " - f"Watch the live execution below." - ), - "action_url": f"/tasks/{task_id}", - }) - - if tool_name == "connect_integration": - slug = tool_args.get("provider_slug", "").lower() - row = await db.fetchrow( - "SELECT id, name, auth_type FROM integration_providers " - "WHERE slug = $1", - slug, - ) - if not row: - return json.dumps({ - "success": False, - "error": f"Integration '{slug}' not found.", - }) - log.info("chat_connect_integration", { - "component": "api.chat", - "operation": "connect_integration", - "entity_id": user_id or "anon", - "correlation_id": cid, - "metadata": {"slug": slug, "provider": row["name"]}, - }) - return json.dumps({ - "success": True, - "message": f"Opening setup page for {row['name']}.", - "action_url": f"/integrations?connect={slug}", - }) - - if tool_name == "delete_memory": - agent_id = tool_args.get("agent_id", "") - memory_ids: List[str] = tool_args.get("memory_ids") or [] - tags: List[str] = tool_args.get("tags") or [] - dry_run: bool = tool_args.get("dry_run", True) - - if not agent_id: - return json.dumps({ - "success": False, - "error": "agent_id is required.", - }) - - # Resolve candidates - if memory_ids: - rows = await db.fetch( - """ - SELECT id, summary, memory_type, score, tags - FROM agent_memories - WHERE agent_id = $1 - AND id = ANY($2) - AND is_deleted = false - ORDER BY score DESC - """, - agent_id, - memory_ids, - ) - elif tags: - rows = await db.fetch( - """ - SELECT id, summary, memory_type, score, tags - FROM agent_memories - WHERE agent_id = $1 - AND tags && $2 - AND is_deleted = false - ORDER BY score DESC - LIMIT 50 - """, - agent_id, - tags, - ) - else: - return json.dumps({ - "success": False, - "error": ( - "Provide either memory_ids or tags to target " - "memories for deletion." - ), - }) - - candidates = [ - { - "id": r["id"], - "summary": r["summary"] or "", - "memory_type": r["memory_type"], - "score": r["score"], - "tags": list(r["tags"] or []), - } - for r in rows - ] - - if dry_run: - log.info("chat_delete_memory_dry_run", { - "component": "api.chat", - "operation": "delete_memory", - "entity_id": agent_id, - "correlation_id": cid, - "metadata": { - "candidate_count": len(candidates), - "filter_tags": tags, - "filter_ids": memory_ids, - }, - }) - return json.dumps({ - "success": True, - "dry_run": True, - "candidates": candidates, - "message": ( - f"Found {len(candidates)} memories matching your " - f"criteria. Re-run with dry_run=false to confirm " - f"deletion." - ), - }) - - # Actual soft-delete - ids_to_delete = [r["id"] for r in candidates] - if ids_to_delete: - await db.execute( - """ - UPDATE agent_memories - SET is_deleted = true, updated_at = now() - WHERE id = ANY($1) - """, - ids_to_delete, - ) - log.info("chat_delete_memory_executed", { - "component": "api.chat", - "operation": "delete_memory", - "entity_id": agent_id, - "correlation_id": cid, - "metadata": { - "deleted_count": len(ids_to_delete), - "deleted_ids": ids_to_delete, - }, - }) - if bus is not None: - try: - from src.events import MemoryEvents - await _emit(bus, MemoryEvents.deleted( - event_id=f"evt_{cid}_memdel", - agent_id=agent_id, - deleted_count=len(ids_to_delete), - correlation_id=cid, - )) - except Exception: - pass - return json.dumps({ - "success": True, - "dry_run": False, - "deleted_count": len(ids_to_delete), - "message": ( - f"{len(ids_to_delete)} memories deleted from " - f"agent {agent_id}." - ), - }) - - if tool_name == "create_plugin": - return await _handle_create_plugin( - tool_args, db, user_id, cid, bus - ) - - if tool_name == "publish_plugin": - return await _handle_publish_plugin( - tool_args, db, user_id, cid, bus - ) - - return json.dumps({"error": f"Unknown tool: {tool_name}"}) - - except Exception as exc: - log.error("chat_tool_call_error", { - "component": "api.chat", "operation": "tool_call", - "entity_id": user_id or "anon", "correlation_id": cid, - "metadata": {"tool": tool_name, "error": str(exc)}, - }) - return json.dumps({"error": str(exc)}) + """Execute a tool call via the registry and return a JSON string result.""" + ctx = ToolContext(db=db, user_id=user_id, cid=cid, ctrl=ctrl, bus=bus) + return await _tool_registry.execute(tool_name, tool_args, ctx) def _get_event_bus(request: Request): @@ -1808,7 +513,10 @@ def _build_system_prompt( "- Use `create_agent` when user wants to build/create/set up an agent.\n" "- Use `run_agent` when user wants to execute/run a task on an agent.\n" "- Use `list_agents` when user asks what agents exist.\n" - "- Use `search_plugins` when user asks about integrations or connections.\n" + "- Use `list_integrations` when user asks what integrations are available, " + "supported, or can be connected. This returns the full catalog with categories.\n" + "- Use `connect_integration` when user says 'connect Slack', 'integrate GitHub', " + "or wants to link a specific service. Pass the provider_slug.\n" "- Use `create_plugin` when user asks to build/write/create a plugin or " "tool for agents. ALWAYS call with confirm=false first to show the draft. " "Only re-call with confirm=true when the user message contains " @@ -1820,6 +528,25 @@ def _build_system_prompt( "ALWAYS call with confirm=false first to show a preview. " "Only re-call with confirm=true when user explicitly approves publishing.\n" "- Answer in markdown for all other questions.\n\n" + "Response formatting rules:\n" + "- ALWAYS embed inline links when you mention any NebulaOS page or feature. " + "Use [link:Label:/route] — this renders as a clickable link in the UI.\n" + "- Route pattern: entity pages follow / (e.g. /agents, /tasks, /plugins, " + "/models, /integrations, /knowledge, /marketplace, /settings, /logs). " + "Specific items: /agents/, /tasks/, /plugins/.\n" + "- Use agent_url / task_url returned by tool results for direct entity links.\n" + "- When list_agents returns multiple matches, LIST ALL OF THEM — " + "never pick just one. Show each agent name as a link to its agent_url, " + "with its state and type. Format as a markdown list.\n" + "- When your reply CREATES something or suggests an immediate action, " + "embed a button BELOW the text: " + "[button:Label:/route] — e.g. [button:View Agent:/agents/]\n" + "- Only add buttons for concrete, immediate actions — not for general navigation.\n" + "- At the very end of your reply, on a new line, add up to 3 relevant " + "follow-up questions or actions the user might want next, using: " + "||SUGGESTIONS: question one | question two | question three\n" + "- Only include the ||SUGGESTIONS: line when you have genuinely useful " + "follow-ups; omit it for simple factual answers.\n\n" "Anti-hallucination rules:\n" "- ONLY reference agents, plugins, models, and tasks listed in SYSTEM STATE below.\n" "- NEVER invent agent names, plugin names, or capabilities not in SYSTEM STATE.\n" @@ -1829,60 +556,59 @@ def _build_system_prompt( ) if system_state: - agents = system_state.get("agents", []) - models = system_state.get("models", []) - plugins = system_state.get("plugins", []) - integrations = system_state.get("integrations", []) - tools = system_state.get("tools", []) + provider_registry = system_state.get("provider_registry", []) + data_reg = system_state.get("data_registry", {}) + agents = data_reg.get("agents", []) + models = data_reg.get("models", []) + plugins = data_reg.get("plugins", []) + tasks = data_reg.get("tasks", []) - base += "## SYSTEM STATE (live, ground truth)\n" + total_tools = sum(len(p.get("tools", [])) for p in provider_registry) + names_only = total_tools > _TOOLS_INLINE_LIMIT - tasks = system_state.get("tasks", []) + # Compact set notation: {nebula:{t1,t2},slack:{t1,t2}} + # Falls back to count-only {nebula:15} when total exceeds limit. + parts = [] + for p in provider_registry: + tools = p.get("tools", []) + if names_only: + parts.append(f"{p['slug']}:{len(tools)}") + else: + parts.append(f"{p['slug']}:{{{','.join(tools)}}}" if tools else f"{p['slug']}:{{}}") + registry_str = "{" + ",".join(parts) + "}" if parts else "{}" - if agents: - agent_lines = ", ".join( - f"{a['name']} ({a['id']}, {a['state']})" - for a in agents + base += f"## PROVIDER REGISTRY (live)\n{registry_str}\n" + if names_only: + base += ( + "NOTE: Tool names omitted (>" + str(_TOOLS_INLINE_LIMIT) + " total). " + "Call list_provider_tools(slug) before invoking any tool.\n" ) - base += f"Agents: {agent_lines}\n" - else: - base += "Agents: none created yet\n" - - if models: - model_lines = ", ".join(m['name'] or m['id'] for m in models) - base += f"Models: {model_lines}\n" - else: - base += "Models: none registered\n" - - if integrations: - int_lines = ", ".join( - f"{i['provider']} ({', '.join(i['capabilities'][:3])})" - for i in integrations - ) - base += f"Connected integrations (tool stacks): {int_lines}\n" - else: - base += "Connected integrations: none\n" - - if plugins: - plugin_lines = ", ".join(p['name'] for p in plugins) - base += f"Active plugins: {plugin_lines}\n" - else: - base += "Active plugins: none\n" - - if tasks: - task_lines = ", ".join( - f"{t['description'][:40] or t['id']} ({t['status']})" - for t in tasks - ) - base += f"Recent tasks: {task_lines}\n" - else: - base += "Recent tasks: none\n" - - if tools: - base += f"Builtin tools: {', '.join(tools)}\n" - base += "\n" + # DATA REGISTRY — compact: {key:{entry,...}} — call tools for full detail + def _compact_agents(lst): + if not lst: + return "{}" + return "{" + ",".join(f"{a['name']}({a['id']}:{a['state']})" for a in lst) + "}" + + def _compact_simple(lst, key="name"): + if not lst: + return "{}" + return "{" + ",".join(str(i.get(key) or i.get("id", "?")) for i in lst) + "}" + + def _compact_tasks(lst): + if not lst: + return "{}" + return "{" + ",".join(f"{t['id']}:{t['status']}" for t in lst) + "}" + + base += ( + "## DATA REGISTRY (live — call tools for full detail)\n" + f"agents:{_compact_agents(agents)}\n" + f"models:{_compact_simple(models, 'name')}\n" + f"plugins:{_compact_simple(plugins, 'name')}\n" + f"tasks:{_compact_tasks(tasks)}\n\n" + ) + if context_chunks: context_block = "\n\n---\n\n".join(context_chunks) base += ( @@ -2001,6 +727,7 @@ async def chat( cid = _cid() ctx = ExecutionContext(correlation_id=cid) start_ms = time.monotonic() + session_id = body.session_id or f"sess_{int(time.time() * 1000)}_{secrets.token_hex(4)}" log.info("chat_request", { "component": "api.chat", @@ -2011,7 +738,7 @@ async def chat( "message_len": len(body.message), "history_turns": len(body.history), "top_k": body.top_k, - "session_id": body.session_id, + "session_id": session_id, }, }) @@ -2020,21 +747,21 @@ async def chat( bus = _get_event_bus(request) user_id = getattr(identity, "user_id", None) + # Persist user message first, then emit event (ordering guarantee) + await _persist_message( + db, session_id, "user", body.message, mode="nebula" + ) + # Emit chat channel event — chat is a first-class runtime channel await _emit(bus, ChatEvents.message_sent( event_id=f"evt_{cid}", - chat_session_id=body.session_id, + chat_session_id=session_id, user_id=user_id, correlation_id=cid, message_len=len(body.message), mode="nebula", )) - # Persist user message early (non-fatal) - await _persist_message( - db, body.session_id, "user", body.message, mode="nebula" - ) - # Bridge mode: when a client model override is set but no backend provider # is configured, fall through to RAG+context assembly and return the # assembled messages for the client (browser extension) to call locally. @@ -2093,7 +820,8 @@ async def chat( # Non-fatal — continue without context # ── 2. Build messages (with live system state) ───────────────────────── - system_state = await _get_system_state(db) + user_email = getattr(identity, "email", None) + system_state = await _get_system_state(db, user_email=user_email) system_prompt = _build_system_prompt(context_chunks, system_state) messages: List[Dict[str, Any]] = [ {"role": "system", "content": system_prompt}, @@ -2122,6 +850,7 @@ async def chat( }) return { "reply": "", + "session_id": session_id, "corpus_id": corpus_id, "chunks_used": chunks_used, "model_used": model_name, @@ -2130,15 +859,16 @@ async def chat( "tool_actions": [], "bridge_required": True, "bridge_messages": messages, + "bridge_tools": _get_tools(), } # ── 3. LLM call (with tool-calling enabled) ───────────────────────────── - model = body.model or os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") + model = body.model or _get_default_model(request) try: result = await model_provider.complete( model=model, messages=messages, - tools=NEBULA_CHAT_TOOLS, + tools=_get_tools(), context=ctx, temperature=0.4, max_tokens=2048, @@ -2182,52 +912,48 @@ async def chat( # ── Handle tool_calls from LLM ───────────────────────────────────── tool_calls = data.get("tool_calls") or [] if tool_calls: - # Execute each tool call and build a follow-up completion + # Execute all tool calls in parallel (sequential where dependencies exist) messages.append({"role": "assistant", "content": None, "tool_calls": tool_calls}) - for tc in tool_calls: - fn = tc.get("function", {}) - tool_name = fn.get("name", "") - try: - tool_args = json.loads(fn.get("arguments", "{}")) - except Exception: - tool_args = {} - try: - _ctrl = get_controller(request) - except Exception: - _ctrl = None - tool_result = await _execute_tool_call( - tool_name, tool_args, db, - user_id, cid, ctrl=_ctrl, bus=bus, - ) + try: + _ctrl = get_controller(request) + except Exception: + _ctrl = None + tool_ctx = ToolContext(db=db, user_id=user_id, cid=cid, ctrl=_ctrl, bus=bus) + parallel_results = await _tool_registry.execute_parallel(tool_calls, tool_ctx) + for pr in parallel_results: + tool_name = pr["tool_name"] + tool_result = pr["result_str"] result_data = json.loads(tool_result) if tool_result else {} - if ( - result_data.get("action_url") - or result_data.get("draft") is True - or result_data.get("plugin_id") - or result_data.get("marketplace_id") - or result_data.get("success") is False - ): - tool_actions.append({ - "tool": tool_name, - "result": result_data, - }) + tool_actions.append({ + "tool": tool_name, + "result": result_data, + }) messages.append({ "role": "tool", - "tool_call_id": tc.get("id", ""), + "tool_call_id": pr["tool_call_id"], "content": tool_result, }) # Second LLM call to produce human-readable reply from tool results - follow_up = await model_provider.complete( - model=model, - messages=messages, - tools=None, - context=ctx, - temperature=0.4, - max_tokens=1024, - ) - if follow_up.success and follow_up.data: - reply_text = follow_up.data.get("content") or "" - model_used = follow_up.data.get("model", model_used) + try: + follow_up = await model_provider.complete( + model=model, + messages=messages, + tools=None, + context=ctx, + temperature=0.4, + max_tokens=1024, + ) + if follow_up.success and follow_up.data: + reply_text = follow_up.data.get("content") or "" + model_used = follow_up.data.get("model", model_used) + except Exception as follow_up_exc: + log.error("chat_followup_llm_error", { + "component": "api.chat", + "operation": "chat_followup", + "entity_id": "nebula-assistant", + "correlation_id": cid, + "metadata": {"error": str(follow_up_exc)}, + }) if not reply_text and tool_actions: # Fallback: build reply from tool results directly lines = [] @@ -2282,7 +1008,7 @@ async def chat( # Persist assistant reply await _persist_message( - db, body.session_id, "assistant", reply_text, + db, session_id, "assistant", reply_text, mode="nebula", meta={"chunks_used": chunks_used, "latency_ms": latency_ms, "model": model_used}, ) @@ -2290,7 +1016,7 @@ async def chat( # Emit reply event — allows runtime subscribers to react to chat replies await _emit(bus, ChatEvents.message_replied( event_id=f"evt_{cid}_reply", - chat_session_id=body.session_id, + chat_session_id=session_id, user_id=user_id, correlation_id=cid, chunks_used=chunks_used, @@ -2301,6 +1027,7 @@ async def chat( return { "reply": reply_text, + "session_id": session_id, "corpus_id": corpus_id, "chunks_used": chunks_used, "model_used": model_used, diff --git a/src/api/routers/chat_tools/integration_tools.py b/src/api/routers/chat_tools/integration_tools.py new file mode 100644 index 00000000..4f3c51d5 --- /dev/null +++ b/src/api/routers/chat_tools/integration_tools.py @@ -0,0 +1,235 @@ +"""Integration-stack chat tools: list_integrations, connect_integration, list_provider_tools.""" + +from __future__ import annotations + +from typing import Any, Dict + +from libs.logging import get_logger +from src.api.routers.chat_tools import ChatTool, ToolContext, registry + +log = get_logger("api.chat.tools.integrations") + + +@registry.register +class ListIntegrationsTool(ChatTool): + name = "list_integrations" + definition = { + "type": "function", + "function": { + "name": "list_integrations", + "description": ( + "List all integration providers (Slack, GitHub, Jira, Telegram, " + "Notion, Discord, Trello, etc.) available in NebulaOS, optionally " + "filtered by category or search term. Also returns which ones the " + "user has already connected (instances)." + ), + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Optional filter: service name or category", + }, + }, + "required": [], + }, + }, + } + + async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]: + query = args.get("query", "").lower() + q_filter = f"%{query}%" if query else "%" + try: + rows = await ctx.db.fetch( + """ + SELECT ip.id, ip.name, ip.slug, ip.category, ip.capabilities, + COUNT(ii.id) AS connected_count + FROM integration_providers ip + LEFT JOIN integration_instances ii + ON ii.provider_id = ip.id AND ii.status = 'active' + WHERE (LOWER(ip.name) LIKE $1 OR LOWER(ip.category) LIKE $1) + GROUP BY ip.id, ip.name, ip.slug, ip.category, ip.capabilities + ORDER BY ip.name + LIMIT 20 + """, + q_filter, + ) + except Exception as exc: + log.error("chat_list_integrations_db_error", { + "component": "api.chat.tools", + "operation": "list_integrations", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"error": str(exc)}, + }) + return {"integrations": [], "count": 0, "action_url": "/integrations"} + integrations = [ + { + "name": r["name"], + "slug": r["slug"], + "category": r["category"], + "capabilities": list(r["capabilities"] or []), + "connected": r["connected_count"] > 0, + } + for r in rows + ] + return { + "integrations": integrations, + "count": len(integrations), + "action_url": "/integrations", + } + + +@registry.register +class ConnectIntegrationTool(ChatTool): + name = "connect_integration" + definition = { + "type": "function", + "function": { + "name": "connect_integration", + "description": ( + "Open the integration setup page for a specific provider, so the " + "user can connect it. Use when user says 'connect my Slack', " + "'integrate Telegram', 'set up GitHub', or mentions wanting to " + "link a specific external service." + ), + "parameters": { + "type": "object", + "properties": { + "provider_slug": { + "type": "string", + "description": ( + "Slug of the integration provider: slack, github, jira, " + "linear, notion, telegram, whatsapp, pagerduty, ado" + ), + }, + }, + "required": ["provider_slug"], + }, + }, + } + + async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]: + slug = args.get("provider_slug", "").lower() + try: + row = await ctx.db.fetchrow( + "SELECT id, name, auth_type FROM integration_providers WHERE slug = $1", + slug, + ) + except Exception as exc: + log.error("chat_connect_integration_db_error", { + "component": "api.chat.tools", + "operation": "connect_integration", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"slug": slug, "error": str(exc)}, + }) + return {"success": False, "error": "Database error looking up integration."} + if not row: + log.warn("chat_connect_integration_not_found", { + "component": "api.chat.tools", + "operation": "connect_integration", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"slug": slug}, + }) + return {"success": False, "error": f"Integration '{slug}' not found."} + log.info("chat_connect_integration", { + "component": "api.chat.tools", + "operation": "connect_integration", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"slug": slug, "provider": row["name"]}, + }) + return { + "success": True, + "message": f"Opening setup page for {row['name']}.", + "action_url": f"/integrations?connect={slug}", + } + + +@registry.register +class ListProviderToolsTool(ChatTool): + name = "list_provider_tools" + definition = { + "type": "function", + "function": { + "name": "list_provider_tools", + "description": ( + "Return the list of callable tools for a specific provider. " + "Use this when the PROVIDER REGISTRY shows tool counts but not names " + "(compact mode is active because total tools exceed the inline limit). " + "Pass slug='nebula' to get NebulaOS native tools. " + "Pass any other connected provider's slug for its tool list." + ), + "parameters": { + "type": "object", + "properties": { + "slug": { + "type": "string", + "description": "Provider slug from PROVIDER REGISTRY (e.g. 'nebula', 'slack').", + }, + }, + "required": ["slug"], + }, + }, + } + + async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]: + slug = (args.get("slug") or "").strip().lower() + if not slug: + return {"success": False, "message": "slug is required."} + + if slug == "nebula": + tools = [d["function"]["name"] for d in registry.get_definitions()] + log.info("chat_list_provider_tools", { + "component": "api.chat.tools.integrations", + "operation": "list_provider_tools", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"slug": "nebula", "count": len(tools)}, + }) + return {"success": True, "slug": "nebula", "name": "NebulaOS", "tools": tools, "count": len(tools)} + + try: + row = await ctx.db.fetchrow( + "SELECT name, capabilities, is_implemented FROM integration_providers WHERE slug = $1", + slug, + ) + except Exception as exc: + log.error("chat_list_provider_tools_error", { + "component": "api.chat.tools.integrations", + "operation": "list_provider_tools", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"slug": slug, "error": str(exc)}, + }) + return {"success": False, "message": f"Failed to look up provider '{slug}'."} + + if not row: + return { + "success": False, + "message": ( + f"Unknown provider slug: '{slug}'. " + "Use list_integrations to browse available providers." + ), + } + + if not row["is_implemented"]: + return { + "success": False, + "message": ( + f"Provider '{row['name']}' is in the catalog but not yet implemented. " + f"Use connect_integration('{slug}') to set it up when it becomes available." + ), + } + + tools = list(row["capabilities"] or []) + log.info("chat_list_provider_tools", { + "component": "api.chat.tools.integrations", + "operation": "list_provider_tools", + "entity_id": ctx.user_id or "anon", + "correlation_id": ctx.cid, + "metadata": {"slug": slug, "name": row["name"], "count": len(tools)}, + }) + return {"success": True, "slug": slug, "name": row["name"], "tools": tools, "count": len(tools)} diff --git a/tests/unit/test_create_plugin_tool.py b/tests/unit/test_create_plugin_tool.py index ccc7dcda..b633c03b 100644 --- a/tests/unit/test_create_plugin_tool.py +++ b/tests/unit/test_create_plugin_tool.py @@ -7,10 +7,20 @@ Tests cover: - Validation rejections: bad slug, bad semver, disallowed license, disallowed scope """ -import json import pytest from unittest.mock import AsyncMock, MagicMock, patch +from src.api.routers.chat_tools.plugin_tools import ( + _validate_plugin_code, + _SLUG_RE, + _SEMVER_RE, + _PLUGIN_LICENSE_ALLOWLIST, + _PLUGIN_SCOPE_ALLOWLIST, + CreatePluginTool, + PublishPluginTool, +) +from src.api.routers.chat_tools import ToolContext + class _AsyncContextManagerMock: """Minimal async context manager that satisfies `async with` usage.""" @@ -37,17 +47,6 @@ def _make_db_mock(fetchrow_value=None, fetchval_value=None) -> AsyncMock: return mock -# ── Import helpers from chat.py directly ────────────────────────────────────── - -from src.api.routers.chat import ( - _validate_plugin_code, - _SLUG_RE, - _SEMVER_RE, - _PLUGIN_LICENSE_ALLOWLIST, - _PLUGIN_SCOPE_ALLOWLIST, -) - - # ── _validate_plugin_code ───────────────────────────────────────────────────── class TestValidatePluginCode: @@ -128,87 +127,60 @@ VALID_ARGS = { } +def _make_ctx(db=None, bus=None): + return ToolContext(db=db or AsyncMock(), user_id=None, cid="test_corr", bus=bus) + + @pytest.mark.asyncio class TestHandleCreatePlugin: async def test_draft_returns_draft_flag(self): - from src.api.routers.chat import _handle_create_plugin - mock_conn = AsyncMock() - mock_bus = MagicMock() - - raw = await _handle_create_plugin( - VALID_ARGS, mock_conn, None, "test_corr", bus=mock_bus - ) - result = json.loads(raw) + ctx = _make_ctx(bus=MagicMock()) + result = await CreatePluginTool().execute(VALID_ARGS, ctx) assert result["draft"] is True assert result["name"] == "my-plugin" assert result["version"] == "1.0.0" assert result["code"] == VALID_PLUGIN_CODE.strip() assert result["checksum"] is not None assert result["plugin_id"] is None - assert result["success"] is None + assert result.get("success") is None async def test_draft_invalid_slug_returns_error(self): - from src.api.routers.chat import _handle_create_plugin bad_args = {**VALID_ARGS, "name": "my plugin!"} - raw = await _handle_create_plugin( - bad_args, AsyncMock(), None, "c1" - ) - result = json.loads(raw) + result = await CreatePluginTool().execute(bad_args, _make_ctx()) assert result.get("success") is False assert "name" in (result.get("error") or "").lower() async def test_draft_invalid_semver_returns_error(self): - from src.api.routers.chat import _handle_create_plugin bad_args = {**VALID_ARGS, "version": "1.0"} - raw = await _handle_create_plugin( - bad_args, AsyncMock(), None, "c2" - ) - result = json.loads(raw) + result = await CreatePluginTool().execute(bad_args, _make_ctx()) assert result.get("success") is False assert "version" in (result.get("error") or "").lower() async def test_draft_disallowed_license_returns_error(self): - from src.api.routers.chat import _handle_create_plugin bad_args = {**VALID_ARGS, "license": "WTFPL"} - raw = await _handle_create_plugin( - bad_args, AsyncMock(), None, "c3" - ) - result = json.loads(raw) + result = await CreatePluginTool().execute(bad_args, _make_ctx()) assert result.get("success") is False assert "license" in (result.get("error") or "").lower() async def test_draft_disallowed_scope_returns_error(self): - from src.api.routers.chat import _handle_create_plugin bad_args = {**VALID_ARGS, "scopes": ["read:everything"]} - raw = await _handle_create_plugin( - bad_args, AsyncMock(), None, "c4" - ) - result = json.loads(raw) + result = await CreatePluginTool().execute(bad_args, _make_ctx()) assert result.get("success") is False assert "scope" in (result.get("error") or "").lower() async def test_draft_bad_code_returns_error(self): - from src.api.routers.chat import _handle_create_plugin bad_args = {**VALID_ARGS, "code": "x = 1"} - raw = await _handle_create_plugin( - bad_args, AsyncMock(), None, "c5" - ) - result = json.loads(raw) + result = await CreatePluginTool().execute(bad_args, _make_ctx()) assert result.get("success") is False async def test_confirm_true_calls_db_and_emits_event(self): - from src.api.routers.chat import _handle_create_plugin confirm_args = {**VALID_ARGS, "confirm": True} - - mock_conn = _make_db_mock(fetchrow_value=None) mock_bus = AsyncMock() + ctx = _make_ctx(db=_make_db_mock(fetchrow_value=None), bus=mock_bus) - with patch("src.api.routers.chat.os.makedirs"), \ + with patch("src.api.routers.chat_tools.plugin_tools.os.makedirs"), \ patch("builtins.open", MagicMock()): - raw = await _handle_create_plugin( - confirm_args, mock_conn, None, "c6", bus=mock_bus - ) - result = json.loads(raw) + result = await CreatePluginTool().execute(confirm_args, ctx) assert result.get("success") is True assert result.get("draft") is False assert result.get("plugin_id") is not None @@ -220,7 +192,6 @@ class TestHandleCreatePlugin: @pytest.mark.asyncio class TestHandlePublishPlugin: async def test_preview_returns_preview_info(self): - from src.api.routers.chat import _handle_publish_plugin args = {"plugin_id": "plugin_abc123", "confirm": False} mock_conn = AsyncMock() mock_conn.fetchrow.return_value = { @@ -232,55 +203,40 @@ class TestHandlePublishPlugin: "tags": [], "license": "MIT", } - - raw = await _handle_publish_plugin( - args, mock_conn, None, "p1" - ) - result = json.loads(raw) + result = await PublishPluginTool().execute(args, _make_ctx(db=mock_conn)) assert result.get("success") is None assert result.get("name") == "my-plugin" async def test_confirm_true_publishes_and_emits(self): - from src.api.routers.chat import _handle_publish_plugin args = {"plugin_id": "plugin_abc123", "confirm": True} - mock_conn = _make_db_mock( - fetchrow_value={ - "name": "my-plugin", - "description": "Test", - "version": "1.0.0", - "trust_level": "community", - "is_published": False, - "tags": ["test"], - "license": "MIT", - }, - fetchval_value="mkt_test_001", - ) mock_bus = AsyncMock() - - raw = await _handle_publish_plugin( - args, mock_conn, None, "p2", bus=mock_bus + ctx = _make_ctx( + db=_make_db_mock( + fetchrow_value={ + "name": "my-plugin", + "description": "Test", + "version": "1.0.0", + "trust_level": "community", + "is_published": False, + "tags": ["test"], + "license": "MIT", + }, + fetchval_value="mkt_test_001", + ), + bus=mock_bus, ) - result = json.loads(raw) + result = await PublishPluginTool().execute(args, ctx) assert result.get("success") is True assert result.get("marketplace_id") is not None mock_bus.emit.assert_called_once() async def test_missing_plugin_id_returns_error(self): - from src.api.routers.chat import _handle_publish_plugin - raw = await _handle_publish_plugin( - {"confirm": True}, AsyncMock(), None, "p3" - ) - result = json.loads(raw) + result = await PublishPluginTool().execute({"confirm": True}, _make_ctx()) assert result.get("success") is False async def test_plugin_not_found_returns_error(self): - from src.api.routers.chat import _handle_publish_plugin args = {"plugin_id": "plugin_nonexistent", "confirm": True} mock_conn = AsyncMock() mock_conn.fetchrow.return_value = None - - raw = await _handle_publish_plugin( - args, mock_conn, None, "p4" - ) - result = json.loads(raw) + result = await PublishPluginTool().execute(args, _make_ctx(db=mock_conn)) assert result.get("success") is False