fix: KB ingest error surfacing, chat channel events, profile menu reorder, session empty state, error_msg→error_message API fix
Some checks failed
Stuffle/nebula-os/pipeline/head There was a failure building this commit
Stuffle/nebula-os/pipeline/pr-main Build started...

This commit is contained in:
2026-04-12 17:18:44 +05:30
parent 0a0eeca865
commit f554934f45
5 changed files with 285 additions and 16 deletions

View File

@@ -20,8 +20,9 @@ from pydantic import BaseModel, Field
from libs.db.connection import get_db
from libs.logging import get_logger
from src.api.middleware.auth import get_identity
from src.api.middleware.auth import get_identity, AuthIdentity
from src.common.types import ExecutionContext
from src.events.event_types import ChatEvents
log = get_logger("api.chat")
router = APIRouter(prefix="/chat", tags=["chat"])
@@ -129,6 +130,30 @@ def _get_model_provider(request: Request):
return getattr(request.app.state, "offered_model_provider", None)
def _get_event_bus(request: Request):
"""Return PlatformEventBus from app container, or None."""
container = getattr(request.app.state, "container", None)
if container is None:
return None
return container.get("platform_event_bus")
async def _emit(bus, event) -> None:
"""Fire-and-forget event emission — never raises."""
if bus is None:
return
try:
await bus.emit(event)
except Exception as exc:
log.error("chat_event_emit_error", {
"component": "api.chat",
"operation": "emit",
"entity_id": getattr(event, "event_id", "?"),
"correlation_id": getattr(event, "correlation_id", "?"),
"metadata": {"error": str(exc)},
})
async def _resolve_corpus_id(
db, requested: Optional[str]
) -> Optional[str]:
@@ -211,8 +236,12 @@ async def list_session_messages(session_id: str) -> List[Dict[str, Any]]:
]
@router.post("", response_model=ChatResponse, dependencies=[Depends(get_identity)])
async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]:
@router.post("", response_model=ChatResponse)
async def chat(
body: ChatRequest,
request: Request,
identity: AuthIdentity = Depends(get_identity),
) -> Dict[str, Any]:
"""RAG-augmented chat with NebulaOS assistant.
1. Retrieves relevant chunks from the nebula-system corpus (or provided corpus).
@@ -239,6 +268,18 @@ async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]:
db = get_db()
model_provider = _get_model_provider(request)
bus = _get_event_bus(request)
user_id = getattr(identity, "user_id", None)
# Emit chat channel event — chat is a first-class runtime channel
await _emit(bus, ChatEvents.message_sent(
event_id=f"evt_{cid}",
chat_session_id=body.session_id,
user_id=user_id,
correlation_id=cid,
message_len=len(body.message),
mode="nebula",
))
# Persist user message early (non-fatal)
await _persist_message(
@@ -328,6 +369,13 @@ async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]:
"correlation_id": cid,
"metadata": {"error": str(exc)},
})
await _emit(bus, ChatEvents.reply_failed(
event_id=f"evt_{cid}_err",
chat_session_id=body.session_id,
error=str(exc),
user_id=user_id,
correlation_id=cid,
))
raise HTTPException(
status_code=502, detail=f"LLM error: {exc}"
) from exc
@@ -400,6 +448,18 @@ async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]:
meta={"chunks_used": chunks_used, "latency_ms": latency_ms, "model": model_used},
)
# Emit reply event — allows runtime subscribers to react to chat replies
await _emit(bus, ChatEvents.message_replied(
event_id=f"evt_{cid}_reply",
chat_session_id=body.session_id,
user_id=user_id,
correlation_id=cid,
chunks_used=chunks_used,
latency_ms=latency_ms,
model_used=model_used,
mode="nebula",
))
return {
"reply": reply_text,
"corpus_id": corpus_id,

View File

@@ -139,13 +139,17 @@ class QueryCorpusRequest(BaseModel):
def _get_ingest_service(request: Request):
"""Build an IngestService from the app container (best-effort)."""
"""Build an IngestService from the app container (best-effort).
embeddings_provider may be None when the rag_provider embeds internally
(e.g. AxiomRAGProvider) — IngestService handles that case.
"""
from src.rag.ingest_service import IngestService
container = getattr(request.app.state, "container", None)
rag_provider = container.get("rag_provider") if container else None
embeddings_provider = container.get("embeddings_provider") if container else None
if rag_provider is None or embeddings_provider is None:
if rag_provider is None:
return None
embeddings_provider = container.get("embeddings_provider") if container else None
db = get_db()
return IngestService(
db=db,
@@ -588,21 +592,41 @@ async def add_source(
source_id,
)
else:
log.info("source_ingest_skipped_no_provider", {
log.error("source_ingest_no_provider", {
"component": "api.corpora",
"operation": "add_source",
"entity_id": source_id,
"correlation_id": cid,
"metadata": {
"reason": "rag_provider or embeddings_provider not registered",
"reason": "rag_provider not in app container",
},
})
await db.execute(
"""
UPDATE rag_sources
SET ingest_status = 'error',
error_msg = 'No RAG provider configured. '
'Check EMBEDDINGS_PROVIDER env var.'
WHERE source_id = $1
""",
source_id,
)
await db.execute(
"""
UPDATE rag_corpora
SET status = 'error', updated_at = NOW()
WHERE corpus_id = $1
""",
corpus_id,
)
row = await db.fetchrow(
"SELECT * FROM rag_sources WHERE source_id = $1", source_id
)
src_dict = dict(row)
src_dict["error_message"] = src_dict.pop("error_msg", None)
response: Dict[str, Any] = {
"source": dict(row),
"source": src_dict,
"ingest_triggered": body.source_type != "external_vector_store",
}
if url_classification:
@@ -662,7 +686,13 @@ async def list_sources(
""",
corpus_id,
)
return {"sources": [dict(r) for r in rows], "total": len(rows)}
def _fmt_source(r) -> dict:
d = dict(r)
d["error_message"] = d.pop("error_msg", None)
return d
return {"sources": [_fmt_source(r) for r in rows], "total": len(rows)}
@router.delete("/{corpus_id}/sources/{source_id}", status_code=204)
@@ -885,9 +915,36 @@ async def reindex_corpus(
src["source_id"],
)
triggered += 1
else:
# No ingest service available — mark corpus error so UI surfaces it
await db.execute(
"""
UPDATE rag_corpora
SET status = 'error', updated_at = NOW()
WHERE corpus_id = $1
""",
corpus_id,
)
await db.execute(
"""
UPDATE rag_sources
SET ingest_status = 'error',
error_msg = 'No RAG/embeddings provider configured. '
'Check EMBEDDINGS_PROVIDER env var.'
WHERE corpus_id = $1
""",
corpus_id,
)
log.error("corpus_reindex_no_provider", {
"component": "api.corpora",
"operation": "reindex_corpus",
"entity_id": corpus_id,
"correlation_id": cid,
"metadata": {"error": "no ingest service — rag_provider not in container"},
})
return {
"corpus_id": corpus_id,
"sources_queued": triggered,
"status": "building",
"status": "building" if triggered > 0 else "error",
}

View File

@@ -966,3 +966,143 @@ class SandboxEvents:
**extra_metadata,
},
)
# ── Chat channel events ───────────────────────────────────────────────────────
class ChatEvents:
"""Factory for CHAT-category NebulaEvents.
Chat is treated as a first-class channel in the runtime — every
message and session transition emits a platform event so that:
- the event bus can subscribe and dispatch agents
- correlation chains connect chat → task → agent
- the UI can stream live chat activity
Source convention: "chat.ui" for browser-originated messages.
"""
@staticmethod
def session_opened(
event_id: str,
chat_session_id: str,
user_id: Optional[str] = None,
correlation_id: Optional[str] = None,
**extra_metadata,
) -> "NebulaEvent":
return NebulaEvent(
event_id=event_id,
category=EventCategory.CHAT,
event_type="session_opened",
source="chat.ui",
chat_session_id=chat_session_id,
user_id=user_id,
correlation_id=correlation_id,
outcome=EventOutcome.SUCCESS,
tags=["chat", "session", "open"],
metadata=extra_metadata,
)
@staticmethod
def message_sent(
event_id: str,
chat_session_id: Optional[str],
user_id: Optional[str] = None,
correlation_id: Optional[str] = None,
message_len: int = 0,
mode: str = "nebula",
**extra_metadata,
) -> "NebulaEvent":
return NebulaEvent(
event_id=event_id,
category=EventCategory.CHAT,
event_type="message_sent",
source="chat.ui",
chat_session_id=chat_session_id,
user_id=user_id,
correlation_id=correlation_id,
outcome=EventOutcome.PENDING,
tags=["chat", "message", mode],
metadata={
"message_len": message_len,
"mode": mode,
**extra_metadata,
},
)
@staticmethod
def message_replied(
event_id: str,
chat_session_id: Optional[str],
user_id: Optional[str] = None,
correlation_id: Optional[str] = None,
chunks_used: int = 0,
latency_ms: int = 0,
model_used: str = "",
mode: str = "nebula",
**extra_metadata,
) -> "NebulaEvent":
return NebulaEvent(
event_id=event_id,
category=EventCategory.CHAT,
event_type="message_replied",
source="chat.ui",
chat_session_id=chat_session_id,
user_id=user_id,
correlation_id=correlation_id,
outcome=EventOutcome.SUCCESS,
duration_ms=latency_ms,
tags=["chat", "reply", mode],
metadata={
"chunks_used": chunks_used,
"latency_ms": latency_ms,
"model_used": model_used,
"mode": mode,
**extra_metadata,
},
)
@staticmethod
def reply_failed(
event_id: str,
chat_session_id: Optional[str],
error: str,
user_id: Optional[str] = None,
correlation_id: Optional[str] = None,
**extra_metadata,
) -> "NebulaEvent":
return NebulaEvent(
event_id=event_id,
category=EventCategory.CHAT,
event_type="reply_failed",
source="chat.ui",
chat_session_id=chat_session_id,
user_id=user_id,
correlation_id=correlation_id,
outcome=EventOutcome.FAILURE,
severity=EventSeverity.ERROR,
tags=["chat", "error"],
metadata={"error": error, **extra_metadata},
)
@staticmethod
def session_closed(
event_id: str,
chat_session_id: str,
user_id: Optional[str] = None,
correlation_id: Optional[str] = None,
**extra_metadata,
) -> "NebulaEvent":
return NebulaEvent(
event_id=event_id,
category=EventCategory.CHAT,
event_type="session_closed",
source="chat.ui",
chat_session_id=chat_session_id,
user_id=user_id,
correlation_id=correlation_id,
outcome=EventOutcome.SUCCESS,
tags=["chat", "session", "close"],
metadata=extra_metadata,
)

View File

@@ -4,7 +4,7 @@ import { useNavigate } from 'react-router-dom'
import {
Send, Bot, Loader2, ExternalLink, AlertTriangle,
RotateCcw, Plus, Sparkles, Zap, BookOpen, Hash, HelpCircle,
Plug, Cpu, Database, GitBranch,
Plug, Cpu, Database, GitBranch, MessageSquare,
ChevronRight, CheckCircle2, XCircle, Clock, ListTree,
} from 'lucide-react'
import {
@@ -821,7 +821,18 @@ export function ChatWorkspace() {
Loading session
</div>
)}
{!loadingSession && messages.length === 0 && (
{!loadingSession && messages.length === 0 && sessionId && (
<div className="ne-ChatWorkspace__empty">
<MessageSquare size={24} style={{ color: 'var(--nbl-text-ghost)', marginBottom: 10, opacity: 0.5 }} />
<div style={{ fontSize: 13, color: 'var(--nbl-text-ghost)', fontWeight: 500, marginBottom: 4 }}>
No messages stored
</div>
<div style={{ fontSize: 11, color: 'var(--nbl-text-ghost)', opacity: 0.7, maxWidth: 280, textAlign: 'center', lineHeight: 1.6 }}>
This session predates message persistence. Continue the conversation below.
</div>
</div>
)}
{!loadingSession && messages.length === 0 && !sessionId && (
<div className="ne-ChatWorkspace__empty">
<Sparkles size={30} style={{ color: 'var(--nbl-nebula)', marginBottom: 12, opacity: 0.7 }} />
<div style={{ fontSize: 14, color: 'var(--nbl-text-primary)', fontWeight: 600, marginBottom: 6 }}>

View File

@@ -426,9 +426,6 @@ export function Navbar() {
{activePopover === 'profile' && (
<div className="ne-Navbar__profile-dropdown">
<div className="ne-Navbar__profile-dropdown-name">{profile.name}</div>
<div className="ne-Navbar__profile-dropdown-quota">
<QuotaStatusWidget />
</div>
<Link to="/docs" onClick={() => setActivePopover(null)} className="ne-Navbar__profile-dropdown-item">
<BookOpen size={12} />Docs
</Link>
@@ -438,6 +435,10 @@ export function Navbar() {
<Link to="/profile" onClick={() => setActivePopover(null)} className="ne-Navbar__profile-dropdown-item">
<User size={12} />Profile
</Link>
<div className="ne-Navbar__profile-dropdown-divider" />
<div className="ne-Navbar__profile-dropdown-quota">
<QuotaStatusWidget />
</div>
<Link
to="/quota"
onClick={() => setActivePopover(null)}