fix: agent visibility — user sees own + published agents (is_template=true)
Some checks failed
Stuffle/nebula-os/pipeline/head There was a failure building this commit
Stuffle/nebula-os/pipeline/pr-main Build started...

Root causes:
1. _fetch_agents queried WHERE user_email=$1 but agents table has owner_id
   (no user_email column) → silent SQL error → empty DATA REGISTRY → LLM
   reports 'no agents exist'
2. tasks._fetch_tasks used user_email which is always NULL → empty tasks list
3. Silent except blocks swallowed all DB errors invisibly

Fixes:
- _get_system_state: add user_id param; pass identity.user_id at call site
- _fetch_agents: query owner_id=user_id OR is_template=true; dev fallback
  shows is_template=true only
- _fetch_tasks: query created_by=user_id; dev fallback shows all tasks
- list_agents tool: same visibility rule as _fetch_agents
- All except blocks now log errors (observability rule)

Data:
- Published 24 seeded agents (is_template=true) so all users see them
- User-created agents only visible to their owner (owner_id match)
This commit is contained in:
2026-04-21 00:54:31 +05:30
parent 689776fed8
commit 2227d566b5
2 changed files with 297 additions and 18 deletions

View File

@@ -223,7 +223,11 @@ def _get_default_model(request: Request) -> str:
return os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini")
async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, Any]:
async def _get_system_state(
db,
user_email: Optional[str] = None,
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Fetch live system state for grounding the LLM (anti-hallucination).
Returns two top-level registries:
@@ -234,29 +238,40 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A
All DB queries run in parallel. Each is independently wrapped so a
single failure does not affect the others.
Security: every query that touches user-owned entities filters by
user_email. integration_instances has no owner column yet — marked
TODO for migration 044.
Column notes:
agents.owner_id — stores user UUID (not email); visibility = own agents
OR is_template=true (platform-published agents).
tasks.created_by — stores user UUID; tasks.user_email is never populated.
plugins.user_email — populated correctly; user-scoped filter applies.
"""
async def _fetch_agents():
"""User sees their own agents + platform-published agents (is_template=true)."""
try:
if user_email:
if user_id and user_id not in ("local-admin", "system", "anon"):
rows = await db.fetch(
"SELECT id, name, status, model FROM agents "
"WHERE user_email = $1 ORDER BY created_at DESC LIMIT 20",
user_email,
"WHERE deleted_at IS NULL "
"AND (owner_id = $1 OR is_template = true) "
"ORDER BY is_template, created_at DESC LIMIT 20",
user_id,
)
else:
rows = await db.fetch(
"SELECT id, name, status, model "
"FROM agents ORDER BY created_at DESC LIMIT 20"
"SELECT id, name, status, model FROM agents "
"WHERE deleted_at IS NULL AND is_template = true "
"ORDER BY created_at DESC LIMIT 20"
)
return [
{"id": r["id"], "name": r["name"], "state": r["status"], "model": r["model"]}
for r in rows
]
except Exception:
except Exception as exc:
log.error("system_state_fetch_agents_failed", {
"component": "api.chat", "operation": "_fetch_agents",
"entity_id": user_id or "anon", "correlation_id": "system_state",
"metadata": {"error": str(exc)},
})
return []
async def _fetch_models():
@@ -269,7 +284,12 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A
{"id": r["model_id"], "name": r["display_name"], "provider": r["provider_id"]}
for r in rows
]
except Exception:
except Exception as exc:
log.error("system_state_fetch_models_failed", {
"component": "api.chat", "operation": "_fetch_models",
"entity_id": user_id or "anon", "correlation_id": "system_state",
"metadata": {"error": str(exc)},
})
return []
async def _fetch_plugins():
@@ -289,7 +309,12 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A
"WHERE status IN ('installed', 'published') LIMIT 20"
)
return [{"id": r["id"], "name": r["name"]} for r in rows]
except Exception:
except Exception as exc:
log.error("system_state_fetch_plugins_failed", {
"component": "api.chat", "operation": "_fetch_plugins",
"entity_id": user_id or "anon", "correlation_id": "system_state",
"metadata": {"error": str(exc)},
})
return []
async def _fetch_ext_providers():
@@ -315,16 +340,21 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A
}
for r in rows
]
except Exception:
except Exception as exc:
log.error("system_state_fetch_ext_providers_failed", {
"component": "api.chat", "operation": "_fetch_ext_providers",
"entity_id": user_id or "anon", "correlation_id": "system_state",
"metadata": {"error": str(exc)},
})
return []
async def _fetch_tasks():
try:
if user_email:
if user_id and user_id not in ("local-admin", "system", "anon"):
rows = await db.fetch(
"SELECT id, description, status, agent_id FROM tasks "
"WHERE user_email = $1 ORDER BY created_at DESC LIMIT 10",
user_email,
"WHERE created_by = $1 ORDER BY created_at DESC LIMIT 10",
user_id,
)
else:
rows = await db.fetch(
@@ -340,7 +370,12 @@ async def _get_system_state(db, user_email: Optional[str] = None) -> Dict[str, A
}
for r in rows
]
except Exception:
except Exception as exc:
log.error("system_state_fetch_tasks_failed", {
"component": "api.chat", "operation": "_fetch_tasks",
"entity_id": user_id or "anon", "correlation_id": "system_state",
"metadata": {"error": str(exc)},
})
return []
agents, models, plugins, ext_providers, tasks = await asyncio.gather(
@@ -828,7 +863,8 @@ async def chat(
# ── 2. Build messages (with live system state) ─────────────────────────
user_email = getattr(identity, "email", None)
system_state = await _get_system_state(db, user_email=user_email)
user_id_val = getattr(identity, "user_id", None)
system_state = await _get_system_state(db, user_email=user_email, user_id=user_id_val)
system_prompt = _build_system_prompt(context_chunks, system_state)
messages: List[Dict[str, Any]] = [
{"role": "system", "content": system_prompt},

View File

@@ -0,0 +1,243 @@
"""Agent-stack chat tools: create_agent, run_agent, list_agents."""
from __future__ import annotations
import json
import os
import secrets
import time
from typing import Any, Dict
from libs.logging import get_logger
from src.api.routers.chat_tools import (
ChatTool, ToolContext, activate_agent_for_task, registry,
)
_MAX_AGENT_NAME_LEN = 128
_MAX_SYSTEM_PROMPT_LEN = 16_000
log = get_logger("api.chat.tools.agents")
@registry.register
class CreateAgentTool(ChatTool):
name = "create_agent"
definition = {
"type": "function",
"function": {
"name": "create_agent",
"description": (
"Create a new agent in NebulaOS. Use when the user wants to "
"create, build, or set up an agent, bot, skill, or assistant — "
"even if they describe it in a long multi-sentence paragraph. "
"Extract the name and purpose from whatever they say."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Short slug-friendly agent name, e.g. 'gmail-digest'",
},
"system_prompt": {
"type": "string",
"description": (
"Full agent system prompt capturing purpose, "
"data sources, output format, and any integrations to use."
),
},
"model": {
"type": "string",
"description": "Model ID (leave empty for platform default)",
},
},
"required": ["name", "system_prompt"],
},
},
}
async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]:
agent_id = f"agent_{int(time.time() * 1000)}_{secrets.token_hex(3)}"
name = (args.get("name") or "nebula-agent").strip()[:_MAX_AGENT_NAME_LEN]
system_prompt = (args.get("system_prompt") or "").strip()[:_MAX_SYSTEM_PROMPT_LEN]
model = args.get("model") or os.getenv("NEBULA_LLM_DEFAULT_MODEL", "gpt-4o-mini")
await ctx.db.execute(
"""
INSERT INTO agents
(id, name, status, model, system_prompt, owner_id,
agent_type, configuration, capabilities, created_at, updated_at)
VALUES ($1, $2, 'created', $3, $4, $5, 'workflow', '{}', '{}',
NOW(), NOW())
ON CONFLICT DO NOTHING
""",
agent_id, name, model, system_prompt, ctx.user_id or "system",
)
log.info("chat_agent_created", {
"component": "api.chat.tools", "operation": "create_agent",
"entity_id": agent_id, "correlation_id": ctx.cid,
"metadata": {"name": name, "model": model},
})
return {
"success": True, "agent_id": agent_id, "name": name,
"message": f"Agent '{name}' created with ID {agent_id}.",
"action_url": f"/agents/{agent_id}",
}
@registry.register
class RunAgentTool(ChatTool):
name = "run_agent"
definition = {
"type": "function",
"function": {
"name": "run_agent",
"description": (
"Run a task on an existing agent by its ID. Use when the user "
"wants to execute, run, or dispatch a task to a named agent. "
"Use list_agents first if unsure which agent to target."
),
"parameters": {
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "Agent ID from the SYSTEM STATE agent list",
},
"goal": {
"type": "string",
"description": "Full task goal or instruction",
},
},
"required": ["agent_id", "goal"],
},
},
}
async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]:
agent_id = args.get("agent_id", "")
goal = args.get("goal", "")
agent_row = await ctx.db.fetchrow(
"SELECT id, name FROM agents WHERE id = $1", agent_id
)
if not agent_row:
log.warn("chat_run_agent_not_found", {
"component": "api.chat.tools", "operation": "run_agent",
"entity_id": agent_id, "correlation_id": ctx.cid,
"metadata": {"agent_id": agent_id},
})
return {"success": False, "error": f"Agent {agent_id} not found."}
task_id = f"task_{int(time.time() * 1000)}_{secrets.token_hex(3)}"
await ctx.db.execute(
"""
INSERT INTO tasks
(id, agent_id, description, task_type, status,
priority, input, max_retries, timeout_seconds,
created_by, created_at, updated_at)
VALUES ($1, $2, $3, 'workflow_step', 'pending',
100, $4, 3, 300, $5, NOW(), NOW())
ON CONFLICT DO NOTHING
""",
task_id, agent_id, goal,
json.dumps({"description": goal, "prompt": goal}),
ctx.user_id or "system",
)
if ctx.ctrl is not None:
await activate_agent_for_task(
ctx.ctrl, ctx.db, agent_id, task_id, goal, ctx.cid,
operation="run_agent",
)
log.info("chat_task_created", {
"component": "api.chat.tools", "operation": "run_agent",
"entity_id": task_id, "correlation_id": ctx.cid,
"metadata": {"agent_id": agent_id, "goal": goal},
})
return {
"success": True, "task_id": task_id, "agent_id": agent_id,
"message": (
f"Task dispatched to agent '{agent_row['name']}'. "
f"Task ID: {task_id}."
),
"action_url": f"/tasks/{task_id}",
}
@registry.register
class ListAgentsTool(ChatTool):
name = "list_agents"
definition = {
"type": "function",
"function": {
"name": "list_agents",
"description": (
"List agents in the system with their names, IDs, states, and "
"direct page links. Use before run_agent to find the right agent. "
"When the user asks to 'find', 'search', or 'show' agents by "
"purpose or name, pass a query to filter results."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Optional keyword to filter agents by name or purpose, "
"e.g. 'blog', 'slack', 'monitor'"
),
},
},
"required": [],
},
},
}
_DEV_IDS = frozenset({"local-admin", "system", "anon"})
async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]:
query = args.get("query", "").lower().strip()
uid = ctx.user_id or ""
is_real_user = bool(uid) and uid not in self._DEV_IDS
if query:
if is_real_user:
rows = await ctx.db.fetch(
"SELECT id, name, status, model, agent_type FROM agents "
"WHERE deleted_at IS NULL "
"AND (owner_id = $1 OR is_template = true) "
"AND (LOWER(name) LIKE $2 OR LOWER(agent_type) LIKE $2) "
"ORDER BY is_template, created_at DESC LIMIT 20",
uid, f"%{query}%",
)
else:
rows = await ctx.db.fetch(
"SELECT id, name, status, model, agent_type FROM agents "
"WHERE deleted_at IS NULL AND is_template = true "
"AND (LOWER(name) LIKE $1 OR LOWER(agent_type) LIKE $1) "
"ORDER BY created_at DESC LIMIT 20",
f"%{query}%",
)
else:
if is_real_user:
rows = await ctx.db.fetch(
"SELECT id, name, status, model, agent_type FROM agents "
"WHERE deleted_at IS NULL "
"AND (owner_id = $1 OR is_template = true) "
"ORDER BY is_template, created_at DESC LIMIT 20",
uid,
)
else:
rows = await ctx.db.fetch(
"SELECT id, name, status, model, agent_type FROM agents "
"WHERE deleted_at IS NULL AND is_template = true "
"ORDER BY created_at DESC LIMIT 20"
)
agents = [
{
"id": r["id"],
"name": r["name"],
"state": r["status"],
"type": r["agent_type"] or "workflow",
"agent_url": f"/agents/{r['id']}",
}
for r in rows
]
return {"agents": agents, "count": len(agents), "query": query}