feat: chat system prompt refactor — provider/data registry, compact tool format, user-scoped data, data leakage prevention
All checks were successful
Stuffle/nebula-os/pipeline/head This commit looks good

- _get_system_state now accepts user_email; all entity queries (agents, tasks,
  plugins) filter to the requesting user. Published plugins remain globally
  visible. integration_instances TODO'd for migration 044 (no owner column yet).

- Provider registry: Nebula always first (live registry); external providers
  only when status='active' AND is_implemented=true.

- Tool inline format changed to compact set notation {slug:{t1,t2,...}}.
  Threshold raised to 80 — compact format is ~3x more token-efficient.
  Over-limit falls back to count-only {slug:N} with list_provider_tools() hint.

- New tool: list_provider_tools(slug) — returns tool list for any provider
  on demand; used when compact/count-only mode is active.

- DATA REGISTRY also uses compact notation {agents:{name(id:state),...}}.
  Hardcoded route table replaced with a terse pattern rule.

- Bug fix: ii.is_active (non-existent) → ii.status='active' in both
  _fetch_ext_providers (chat.py) and list_integrations tool.

- Migration 043: is_implemented BOOLEAN NOT NULL DEFAULT FALSE on
  integration_providers. Applied.

- Test fix: test_create_plugin_tool.py updated to import from
  chat_tools/plugin_tools (refactored location) and use tool classes
  directly instead of removed _handle_* functions.
This commit is contained in:
2026-04-20 22:56:11 +05:30
parent 738c2c5ff0
commit e341406fde
4 changed files with 603 additions and 1673 deletions

View File

@@ -0,0 +1,12 @@
-- Migration 043: add is_implemented flag to integration_providers
-- Only providers with is_implemented = true AND an active instance
-- are surfaced in the AI provider registry. Catalog entries with
-- is_implemented = false remain discoverable via list_integrations tool
-- but are NOT injected into the system prompt.
ALTER TABLE integration_providers
ADD COLUMN IF NOT EXISTS is_implemented BOOLEAN NOT NULL DEFAULT FALSE;
-- Nebula itself is the only implemented provider at bootstrap.
-- External provider rows (slack, github, etc.) stay FALSE until
-- their tool code is merged and this flag is flipped.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,235 @@
"""Integration-stack chat tools: list_integrations, connect_integration, list_provider_tools."""
from __future__ import annotations
from typing import Any, Dict
from libs.logging import get_logger
from src.api.routers.chat_tools import ChatTool, ToolContext, registry
log = get_logger("api.chat.tools.integrations")
@registry.register
class ListIntegrationsTool(ChatTool):
name = "list_integrations"
definition = {
"type": "function",
"function": {
"name": "list_integrations",
"description": (
"List all integration providers (Slack, GitHub, Jira, Telegram, "
"Notion, Discord, Trello, etc.) available in NebulaOS, optionally "
"filtered by category or search term. Also returns which ones the "
"user has already connected (instances)."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Optional filter: service name or category",
},
},
"required": [],
},
},
}
async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]:
query = args.get("query", "").lower()
q_filter = f"%{query}%" if query else "%"
try:
rows = await ctx.db.fetch(
"""
SELECT ip.id, ip.name, ip.slug, ip.category, ip.capabilities,
COUNT(ii.id) AS connected_count
FROM integration_providers ip
LEFT JOIN integration_instances ii
ON ii.provider_id = ip.id AND ii.status = 'active'
WHERE (LOWER(ip.name) LIKE $1 OR LOWER(ip.category) LIKE $1)
GROUP BY ip.id, ip.name, ip.slug, ip.category, ip.capabilities
ORDER BY ip.name
LIMIT 20
""",
q_filter,
)
except Exception as exc:
log.error("chat_list_integrations_db_error", {
"component": "api.chat.tools",
"operation": "list_integrations",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"error": str(exc)},
})
return {"integrations": [], "count": 0, "action_url": "/integrations"}
integrations = [
{
"name": r["name"],
"slug": r["slug"],
"category": r["category"],
"capabilities": list(r["capabilities"] or []),
"connected": r["connected_count"] > 0,
}
for r in rows
]
return {
"integrations": integrations,
"count": len(integrations),
"action_url": "/integrations",
}
@registry.register
class ConnectIntegrationTool(ChatTool):
name = "connect_integration"
definition = {
"type": "function",
"function": {
"name": "connect_integration",
"description": (
"Open the integration setup page for a specific provider, so the "
"user can connect it. Use when user says 'connect my Slack', "
"'integrate Telegram', 'set up GitHub', or mentions wanting to "
"link a specific external service."
),
"parameters": {
"type": "object",
"properties": {
"provider_slug": {
"type": "string",
"description": (
"Slug of the integration provider: slack, github, jira, "
"linear, notion, telegram, whatsapp, pagerduty, ado"
),
},
},
"required": ["provider_slug"],
},
},
}
async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]:
slug = args.get("provider_slug", "").lower()
try:
row = await ctx.db.fetchrow(
"SELECT id, name, auth_type FROM integration_providers WHERE slug = $1",
slug,
)
except Exception as exc:
log.error("chat_connect_integration_db_error", {
"component": "api.chat.tools",
"operation": "connect_integration",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"slug": slug, "error": str(exc)},
})
return {"success": False, "error": "Database error looking up integration."}
if not row:
log.warn("chat_connect_integration_not_found", {
"component": "api.chat.tools",
"operation": "connect_integration",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"slug": slug},
})
return {"success": False, "error": f"Integration '{slug}' not found."}
log.info("chat_connect_integration", {
"component": "api.chat.tools",
"operation": "connect_integration",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"slug": slug, "provider": row["name"]},
})
return {
"success": True,
"message": f"Opening setup page for {row['name']}.",
"action_url": f"/integrations?connect={slug}",
}
@registry.register
class ListProviderToolsTool(ChatTool):
name = "list_provider_tools"
definition = {
"type": "function",
"function": {
"name": "list_provider_tools",
"description": (
"Return the list of callable tools for a specific provider. "
"Use this when the PROVIDER REGISTRY shows tool counts but not names "
"(compact mode is active because total tools exceed the inline limit). "
"Pass slug='nebula' to get NebulaOS native tools. "
"Pass any other connected provider's slug for its tool list."
),
"parameters": {
"type": "object",
"properties": {
"slug": {
"type": "string",
"description": "Provider slug from PROVIDER REGISTRY (e.g. 'nebula', 'slack').",
},
},
"required": ["slug"],
},
},
}
async def execute(self, args: Dict[str, Any], ctx: ToolContext) -> Dict[str, Any]:
slug = (args.get("slug") or "").strip().lower()
if not slug:
return {"success": False, "message": "slug is required."}
if slug == "nebula":
tools = [d["function"]["name"] for d in registry.get_definitions()]
log.info("chat_list_provider_tools", {
"component": "api.chat.tools.integrations",
"operation": "list_provider_tools",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"slug": "nebula", "count": len(tools)},
})
return {"success": True, "slug": "nebula", "name": "NebulaOS", "tools": tools, "count": len(tools)}
try:
row = await ctx.db.fetchrow(
"SELECT name, capabilities, is_implemented FROM integration_providers WHERE slug = $1",
slug,
)
except Exception as exc:
log.error("chat_list_provider_tools_error", {
"component": "api.chat.tools.integrations",
"operation": "list_provider_tools",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"slug": slug, "error": str(exc)},
})
return {"success": False, "message": f"Failed to look up provider '{slug}'."}
if not row:
return {
"success": False,
"message": (
f"Unknown provider slug: '{slug}'. "
"Use list_integrations to browse available providers."
),
}
if not row["is_implemented"]:
return {
"success": False,
"message": (
f"Provider '{row['name']}' is in the catalog but not yet implemented. "
f"Use connect_integration('{slug}') to set it up when it becomes available."
),
}
tools = list(row["capabilities"] or [])
log.info("chat_list_provider_tools", {
"component": "api.chat.tools.integrations",
"operation": "list_provider_tools",
"entity_id": ctx.user_id or "anon",
"correlation_id": ctx.cid,
"metadata": {"slug": slug, "name": row["name"], "count": len(tools)},
})
return {"success": True, "slug": slug, "name": row["name"], "tools": tools, "count": len(tools)}

View File

@@ -7,10 +7,20 @@ Tests cover:
- Validation rejections: bad slug, bad semver, disallowed license, disallowed scope
"""
import json
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.api.routers.chat_tools.plugin_tools import (
_validate_plugin_code,
_SLUG_RE,
_SEMVER_RE,
_PLUGIN_LICENSE_ALLOWLIST,
_PLUGIN_SCOPE_ALLOWLIST,
CreatePluginTool,
PublishPluginTool,
)
from src.api.routers.chat_tools import ToolContext
class _AsyncContextManagerMock:
"""Minimal async context manager that satisfies `async with` usage."""
@@ -37,17 +47,6 @@ def _make_db_mock(fetchrow_value=None, fetchval_value=None) -> AsyncMock:
return mock
# ── Import helpers from chat.py directly ──────────────────────────────────────
from src.api.routers.chat import (
_validate_plugin_code,
_SLUG_RE,
_SEMVER_RE,
_PLUGIN_LICENSE_ALLOWLIST,
_PLUGIN_SCOPE_ALLOWLIST,
)
# ── _validate_plugin_code ─────────────────────────────────────────────────────
class TestValidatePluginCode:
@@ -128,87 +127,60 @@ VALID_ARGS = {
}
def _make_ctx(db=None, bus=None):
return ToolContext(db=db or AsyncMock(), user_id=None, cid="test_corr", bus=bus)
@pytest.mark.asyncio
class TestHandleCreatePlugin:
async def test_draft_returns_draft_flag(self):
from src.api.routers.chat import _handle_create_plugin
mock_conn = AsyncMock()
mock_bus = MagicMock()
raw = await _handle_create_plugin(
VALID_ARGS, mock_conn, None, "test_corr", bus=mock_bus
)
result = json.loads(raw)
ctx = _make_ctx(bus=MagicMock())
result = await CreatePluginTool().execute(VALID_ARGS, ctx)
assert result["draft"] is True
assert result["name"] == "my-plugin"
assert result["version"] == "1.0.0"
assert result["code"] == VALID_PLUGIN_CODE.strip()
assert result["checksum"] is not None
assert result["plugin_id"] is None
assert result["success"] is None
assert result.get("success") is None
async def test_draft_invalid_slug_returns_error(self):
from src.api.routers.chat import _handle_create_plugin
bad_args = {**VALID_ARGS, "name": "my plugin!"}
raw = await _handle_create_plugin(
bad_args, AsyncMock(), None, "c1"
)
result = json.loads(raw)
result = await CreatePluginTool().execute(bad_args, _make_ctx())
assert result.get("success") is False
assert "name" in (result.get("error") or "").lower()
async def test_draft_invalid_semver_returns_error(self):
from src.api.routers.chat import _handle_create_plugin
bad_args = {**VALID_ARGS, "version": "1.0"}
raw = await _handle_create_plugin(
bad_args, AsyncMock(), None, "c2"
)
result = json.loads(raw)
result = await CreatePluginTool().execute(bad_args, _make_ctx())
assert result.get("success") is False
assert "version" in (result.get("error") or "").lower()
async def test_draft_disallowed_license_returns_error(self):
from src.api.routers.chat import _handle_create_plugin
bad_args = {**VALID_ARGS, "license": "WTFPL"}
raw = await _handle_create_plugin(
bad_args, AsyncMock(), None, "c3"
)
result = json.loads(raw)
result = await CreatePluginTool().execute(bad_args, _make_ctx())
assert result.get("success") is False
assert "license" in (result.get("error") or "").lower()
async def test_draft_disallowed_scope_returns_error(self):
from src.api.routers.chat import _handle_create_plugin
bad_args = {**VALID_ARGS, "scopes": ["read:everything"]}
raw = await _handle_create_plugin(
bad_args, AsyncMock(), None, "c4"
)
result = json.loads(raw)
result = await CreatePluginTool().execute(bad_args, _make_ctx())
assert result.get("success") is False
assert "scope" in (result.get("error") or "").lower()
async def test_draft_bad_code_returns_error(self):
from src.api.routers.chat import _handle_create_plugin
bad_args = {**VALID_ARGS, "code": "x = 1"}
raw = await _handle_create_plugin(
bad_args, AsyncMock(), None, "c5"
)
result = json.loads(raw)
result = await CreatePluginTool().execute(bad_args, _make_ctx())
assert result.get("success") is False
async def test_confirm_true_calls_db_and_emits_event(self):
from src.api.routers.chat import _handle_create_plugin
confirm_args = {**VALID_ARGS, "confirm": True}
mock_conn = _make_db_mock(fetchrow_value=None)
mock_bus = AsyncMock()
ctx = _make_ctx(db=_make_db_mock(fetchrow_value=None), bus=mock_bus)
with patch("src.api.routers.chat.os.makedirs"), \
with patch("src.api.routers.chat_tools.plugin_tools.os.makedirs"), \
patch("builtins.open", MagicMock()):
raw = await _handle_create_plugin(
confirm_args, mock_conn, None, "c6", bus=mock_bus
)
result = json.loads(raw)
result = await CreatePluginTool().execute(confirm_args, ctx)
assert result.get("success") is True
assert result.get("draft") is False
assert result.get("plugin_id") is not None
@@ -220,7 +192,6 @@ class TestHandleCreatePlugin:
@pytest.mark.asyncio
class TestHandlePublishPlugin:
async def test_preview_returns_preview_info(self):
from src.api.routers.chat import _handle_publish_plugin
args = {"plugin_id": "plugin_abc123", "confirm": False}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = {
@@ -232,55 +203,40 @@ class TestHandlePublishPlugin:
"tags": [],
"license": "MIT",
}
raw = await _handle_publish_plugin(
args, mock_conn, None, "p1"
)
result = json.loads(raw)
result = await PublishPluginTool().execute(args, _make_ctx(db=mock_conn))
assert result.get("success") is None
assert result.get("name") == "my-plugin"
async def test_confirm_true_publishes_and_emits(self):
from src.api.routers.chat import _handle_publish_plugin
args = {"plugin_id": "plugin_abc123", "confirm": True}
mock_conn = _make_db_mock(
fetchrow_value={
"name": "my-plugin",
"description": "Test",
"version": "1.0.0",
"trust_level": "community",
"is_published": False,
"tags": ["test"],
"license": "MIT",
},
fetchval_value="mkt_test_001",
)
mock_bus = AsyncMock()
raw = await _handle_publish_plugin(
args, mock_conn, None, "p2", bus=mock_bus
ctx = _make_ctx(
db=_make_db_mock(
fetchrow_value={
"name": "my-plugin",
"description": "Test",
"version": "1.0.0",
"trust_level": "community",
"is_published": False,
"tags": ["test"],
"license": "MIT",
},
fetchval_value="mkt_test_001",
),
bus=mock_bus,
)
result = json.loads(raw)
result = await PublishPluginTool().execute(args, ctx)
assert result.get("success") is True
assert result.get("marketplace_id") is not None
mock_bus.emit.assert_called_once()
async def test_missing_plugin_id_returns_error(self):
from src.api.routers.chat import _handle_publish_plugin
raw = await _handle_publish_plugin(
{"confirm": True}, AsyncMock(), None, "p3"
)
result = json.loads(raw)
result = await PublishPluginTool().execute({"confirm": True}, _make_ctx())
assert result.get("success") is False
async def test_plugin_not_found_returns_error(self):
from src.api.routers.chat import _handle_publish_plugin
args = {"plugin_id": "plugin_nonexistent", "confirm": True}
mock_conn = AsyncMock()
mock_conn.fetchrow.return_value = None
raw = await _handle_publish_plugin(
args, mock_conn, None, "p4"
)
result = json.loads(raw)
result = await PublishPluginTool().execute(args, _make_ctx(db=mock_conn))
assert result.get("success") is False