From 2227d566b56484a0ea795e363126734601e9867c Mon Sep 17 00:00:00 2001 From: mohiit1502 Date: Tue, 21 Apr 2026 00:54:31 +0530 Subject: [PATCH] =?UTF-8?q?fix:=20agent=20visibility=20=E2=80=94=20user=20?= =?UTF-8?q?sees=20own=20+=20published=20agents=20(is=5Ftemplate=3Dtrue)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root causes: 1. _fetch_agents queried WHERE user_email=$1 but agents table has owner_id (no user_email column) → silent SQL error → empty DATA REGISTRY → LLM reports 'no agents exist' 2. tasks._fetch_tasks used user_email which is always NULL → empty tasks list 3. Silent except blocks swallowed all DB errors invisibly Fixes: - _get_system_state: add user_id param; pass identity.user_id at call site - _fetch_agents: query owner_id=user_id OR is_template=true; dev fallback shows is_template=true only - _fetch_tasks: query created_by=user_id; dev fallback shows all tasks - list_agents tool: same visibility rule as _fetch_agents - All except blocks now log errors (observability rule) Data: - Published 24 seeded agents (is_template=true) so all users see them - User-created agents only visible to their owner (owner_id match) --- src/api/routers/chat.py | 72 +++++-- src/api/routers/chat_tools/agent_tools.py | 243 ++++++++++++++++++++++ 2 files changed, 297 insertions(+), 18 deletions(-) create mode 100644 src/api/routers/chat_tools/agent_tools.py diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index e01fdc89..57848d67 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -223,7 +223,11 @@ def _get_default_model(request: Request) -> str: return os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") -async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, Any]: +async def _get_system_state( + db, + user_email: Optional[str] = None, + user_id: Optional[str] = None, +) -> Dict[str, Any]: """Fetch live system state for grounding the LLM (anti-hallucination). Returns two top-level registries: @@ -234,29 +238,40 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A All DB queries run in parallel. Each is independently wrapped so a single failure does not affect the others. - Security: every query that touches user-owned entities filters by - user_email. integration_instances has no owner column yet — marked - TODO for migration 044. + Column notes: + agents.owner_id — stores user UUID (not email); visibility = own agents + OR is_template=true (platform-published agents). + tasks.created_by — stores user UUID; tasks.user_email is never populated. + plugins.user_email — populated correctly; user-scoped filter applies. """ async def _fetch_agents(): + """User sees their own agents + platform-published agents (is_template=true).""" try: - if user_email: + if user_id and user_id not in ("local-admin", "system", "anon"): rows = await db.fetch( "SELECT id, name, status, model FROM agents " - "WHERE user_email = $1 ORDER BY created_at DESC LIMIT 20", - user_email, + "WHERE deleted_at IS NULL " + "AND (owner_id = $1 OR is_template = true) " + "ORDER BY is_template, created_at DESC LIMIT 20", + user_id, ) else: rows = await db.fetch( - "SELECT id, name, status, model " - "FROM agents ORDER BY created_at DESC LIMIT 20" + "SELECT id, name, status, model FROM agents " + "WHERE deleted_at IS NULL AND is_template = true " + "ORDER BY created_at DESC LIMIT 20" ) return [ {"id": r["id"], "name": r["name"], "state": r["status"], "model": r["model"]} for r in rows ] - except Exception: + except Exception as exc: + log.error("system_state_fetch_agents_failed", { + "component": "api.chat", "operation": "_fetch_agents", + "entity_id": user_id or "anon", "correlation_id": "system_state", + "metadata": {"error": str(exc)}, + }) return [] async def _fetch_models(): @@ -269,7 +284,12 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A {"id": r["model_id"], "name": r["display_name"], "provider": r["provider_id"]} for r in rows ] - except Exception: + except Exception as exc: + log.error("system_state_fetch_models_failed", { + "component": "api.chat", "operation": "_fetch_models", + "entity_id": user_id or "anon", "correlation_id": "system_state", + "metadata": {"error": str(exc)}, + }) return [] async def _fetch_plugins(): @@ -289,7 +309,12 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A "WHERE status IN ('installed', 'published') LIMIT 20" ) return [{"id": r["id"], "name": r["name"]} for r in rows] - except Exception: + except Exception as exc: + log.error("system_state_fetch_plugins_failed", { + "component": "api.chat", "operation": "_fetch_plugins", + "entity_id": user_id or "anon", "correlation_id": "system_state", + "metadata": {"error": str(exc)}, + }) return [] async def _fetch_ext_providers(): @@ -315,16 +340,21 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A } for r in rows ] - except Exception: + except Exception as exc: + log.error("system_state_fetch_ext_providers_failed", { + "component": "api.chat", "operation": "_fetch_ext_providers", + "entity_id": user_id or "anon", "correlation_id": "system_state", + "metadata": {"error": str(exc)}, + }) return [] async def _fetch_tasks(): try: - if user_email: + if user_id and user_id not in ("local-admin", "system", "anon"): rows = await db.fetch( "SELECT id, description, status, agent_id FROM tasks " - "WHERE user_email = $1 ORDER BY created_at DESC LIMIT 10", - user_email, + "WHERE created_by = $1 ORDER BY created_at DESC LIMIT 10", + user_id, ) else: rows = await db.fetch( @@ -340,7 +370,12 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A } for r in rows ] - except Exception: + except Exception as exc: + log.error("system_state_fetch_tasks_failed", { + "component": "api.chat", "operation": "_fetch_tasks", + "entity_id": user_id or "anon", "correlation_id": "system_state", + "metadata": {"error": str(exc)}, + }) return [] agents, models, plugins, ext_providers, tasks = await asyncio.gather( @@ -828,7 +863,8 @@ async def chat( # ── 2. Build messages (with live system state) ───────────────────────── user_email = getattr(identity, "email", None) - system_state = await _get_system_state(db, user_email=user_email) + user_id_val = getattr(identity, "user_id", None) + system_state = await _get_system_state(db, user_email=user_email, user_id=user_id_val) system_prompt = _build_system_prompt(context_chunks, system_state) messages: List[Dict[str, Any]] = [ {"role": "system", "content": system_prompt}, diff --git a/src/api/routers/chat_tools/agent_tools.py b/src/api/routers/chat_tools/agent_tools.py new file mode 100644 index 00000000..4d7c093a --- /dev/null +++ b/src/api/routers/chat_tools/agent_tools.py @@ -0,0 +1,243 @@ +"""Agent-stack chat tools: create_agent, run_agent, list_agents.""" + +from __future__ import annotations + +import json +import os +import secrets +import time +from typing import Any, Dict + +from libs.logging import get_logger +from src.api.routers.chat_tools import ( + ChatTool, ToolContext, activate_agent_for_task, registry, +) + +_MAX_AGENT_NAME_LEN = 128 +_MAX_SYSTEM_PROMPT_LEN = 16_000 + +log = get_logger("api.chat.tools.agents") + + +@registry.register +class CreateAgentTool(ChatTool): + name = "create_agent" + definition = { + "type": "function", + "function": { + "name": "create_agent", + "description": ( + "Create a new agent in NebulaOS. Use when the user wants to " + "create, build, or set up an agent, bot, skill, or assistant — " + "even if they describe it in a long multi-sentence paragraph. " + "Extract the name and purpose from whatever they say." + ), + "parameters": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Short slug-friendly agent name, e.g. 'gmail-digest'", + }, + "system_prompt": { + "type": "string", + "description": ( + "Full agent system prompt capturing purpose, " + "data sources, output format, and any integrations to use." + ), + }, + "model": { + "type": "string", + "description": "Model ID (leave empty for platform default)", + }, + }, + "required": ["name", "system_prompt"], + }, + }, + } + + async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]: + agent_id = f"agent_{int(time.time() * 1000)}_{secrets.token_hex(3)}" + name = (args.get("name") or "nebula-agent").strip()[:_MAX_AGENT_NAME_LEN] + system_prompt = (args.get("system_prompt") or "").strip()[:_MAX_SYSTEM_PROMPT_LEN] + model = args.get("model") or os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini") + await ctx.db.execute( + """ + INSERT INTO agents + (id, name, status, model, system_prompt, owner_id, + agent_type, configuration, capabilities, created_at, updated_at) + VALUES ($1, $2, 'created', $3, $4, $5, 'workflow', '{}', '{}', + NOW(), NOW()) + ON CONFLICT DO NOTHING + """, + agent_id, name, model, system_prompt, ctx.user_id or "system", + ) + log.info("chat_agent_created", { + "component": "api.chat.tools", "operation": "create_agent", + "entity_id": agent_id, "correlation_id": ctx.cid, + "metadata": {"name": name, "model": model}, + }) + return { + "success": True, "agent_id": agent_id, "name": name, + "message": f"Agent '{name}' created with ID {agent_id}.", + "action_url": f"/agents/{agent_id}", + } + + +@registry.register +class RunAgentTool(ChatTool): + name = "run_agent" + definition = { + "type": "function", + "function": { + "name": "run_agent", + "description": ( + "Run a task on an existing agent by its ID. Use when the user " + "wants to execute, run, or dispatch a task to a named agent. " + "Use list_agents first if unsure which agent to target." + ), + "parameters": { + "type": "object", + "properties": { + "agent_id": { + "type": "string", + "description": "Agent ID from the SYSTEM STATE agent list", + }, + "goal": { + "type": "string", + "description": "Full task goal or instruction", + }, + }, + "required": ["agent_id", "goal"], + }, + }, + } + + async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]: + agent_id = args.get("agent_id", "") + goal = args.get("goal", "") + agent_row = await ctx.db.fetchrow( + "SELECT id, name FROM agents WHERE id = $1", agent_id + ) + if not agent_row: + log.warn("chat_run_agent_not_found", { + "component": "api.chat.tools", "operation": "run_agent", + "entity_id": agent_id, "correlation_id": ctx.cid, + "metadata": {"agent_id": agent_id}, + }) + return {"success": False, "error": f"Agent {agent_id} not found."} + task_id = f"task_{int(time.time() * 1000)}_{secrets.token_hex(3)}" + await ctx.db.execute( + """ + INSERT INTO tasks + (id, agent_id, description, task_type, status, + priority, input, max_retries, timeout_seconds, + created_by, created_at, updated_at) + VALUES ($1, $2, $3, 'workflow_step', 'pending', + 100, $4, 3, 300, $5, NOW(), NOW()) + ON CONFLICT DO NOTHING + """, + task_id, agent_id, goal, + json.dumps({"description": goal, "prompt": goal}), + ctx.user_id or "system", + ) + if ctx.ctrl is not None: + await activate_agent_for_task( + ctx.ctrl, ctx.db, agent_id, task_id, goal, ctx.cid, + operation="run_agent", + ) + log.info("chat_task_created", { + "component": "api.chat.tools", "operation": "run_agent", + "entity_id": task_id, "correlation_id": ctx.cid, + "metadata": {"agent_id": agent_id, "goal": goal}, + }) + return { + "success": True, "task_id": task_id, "agent_id": agent_id, + "message": ( + f"Task dispatched to agent '{agent_row['name']}'. " + f"Task ID: {task_id}." + ), + "action_url": f"/tasks/{task_id}", + } + + +@registry.register +class ListAgentsTool(ChatTool): + name = "list_agents" + definition = { + "type": "function", + "function": { + "name": "list_agents", + "description": ( + "List agents in the system with their names, IDs, states, and " + "direct page links. Use before run_agent to find the right agent. " + "When the user asks to 'find', 'search', or 'show' agents by " + "purpose or name, pass a query to filter results." + ), + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": ( + "Optional keyword to filter agents by name or purpose, " + "e.g. 'blog', 'slack', 'monitor'" + ), + }, + }, + "required": [], + }, + }, + } + + _DEV_IDS = frozenset({"local-admin", "system", "anon"}) + + async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]: + query = args.get("query", "").lower().strip() + uid = ctx.user_id or "" + is_real_user = bool(uid) and uid not in self._DEV_IDS + + if query: + if is_real_user: + rows = await ctx.db.fetch( + "SELECT id, name, status, model, agent_type FROM agents " + "WHERE deleted_at IS NULL " + "AND (owner_id = $1 OR is_template = true) " + "AND (LOWER(name) LIKE $2 OR LOWER(agent_type) LIKE $2) " + "ORDER BY is_template, created_at DESC LIMIT 20", + uid, f"%{query}%", + ) + else: + rows = await ctx.db.fetch( + "SELECT id, name, status, model, agent_type FROM agents " + "WHERE deleted_at IS NULL AND is_template = true " + "AND (LOWER(name) LIKE $1 OR LOWER(agent_type) LIKE $1) " + "ORDER BY created_at DESC LIMIT 20", + f"%{query}%", + ) + else: + if is_real_user: + rows = await ctx.db.fetch( + "SELECT id, name, status, model, agent_type FROM agents " + "WHERE deleted_at IS NULL " + "AND (owner_id = $1 OR is_template = true) " + "ORDER BY is_template, created_at DESC LIMIT 20", + uid, + ) + else: + rows = await ctx.db.fetch( + "SELECT id, name, status, model, agent_type FROM agents " + "WHERE deleted_at IS NULL AND is_template = true " + "ORDER BY created_at DESC LIMIT 20" + ) + agents = [ + { + "id": r["id"], + "name": r["name"], + "state": r["status"], + "type": r["agent_type"] or "workflow", + "agent_url": f"/agents/{r['id']}", + } + for r in rows + ] + return {"agents": agents, "count": len(agents), "query": query}