From f554934f45bf67ad23d240086017ece6eea2c44f Mon Sep 17 00:00:00 2001 From: mohiit1502 Date: Sun, 12 Apr 2026 17:18:44 +0530 Subject: [PATCH] =?UTF-8?q?fix:=20KB=20ingest=20error=20surfacing,=20chat?= =?UTF-8?q?=20channel=20events,=20profile=20menu=20reorder,=20session=20em?= =?UTF-8?q?pty=20state,=20error=5Fmsg=E2=86=92error=5Fmessage=20API=20fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/routers/chat.py | 66 ++++++++- src/api/routers/corpora.py | 73 ++++++++- src/events/event_types.py | 140 ++++++++++++++++++ .../src/components/layout/ChatWorkspace.tsx | 15 +- webapp/src/components/layout/Navbar.tsx | 7 +- 5 files changed, 285 insertions(+), 16 deletions(-) diff --git a/src/api/routers/chat.py b/src/api/routers/chat.py index 157d2dfc..a7131d74 100644 --- a/src/api/routers/chat.py +++ b/src/api/routers/chat.py @@ -20,8 +20,9 @@ from pydantic import BaseModel, Field from libs.db.connection import get_db from libs.logging import get_logger -from src.api.middleware.auth import get_identity +from src.api.middleware.auth import get_identity, AuthIdentity from src.common.types import ExecutionContext +from src.events.event_types import ChatEvents log = get_logger("api.chat") router = APIRouter(prefix="/chat", tags=["chat"]) @@ -129,6 +130,30 @@ def _get_model_provider(request: Request): return getattr(request.app.state, "offered_model_provider", None) +def _get_event_bus(request: Request): + """Return PlatformEventBus from app container, or None.""" + container = getattr(request.app.state, "container", None) + if container is None: + return None + return container.get("platform_event_bus") + + +async def _emit(bus, event) -> None: + """Fire-and-forget event emission — never raises.""" + if bus is None: + return + try: + await bus.emit(event) + except Exception as exc: + log.error("chat_event_emit_error", { + "component": "api.chat", + "operation": "emit", + "entity_id": getattr(event, "event_id", "?"), + "correlation_id": getattr(event, "correlation_id", "?"), + "metadata": {"error": str(exc)}, + }) + + async def _resolve_corpus_id( db, requested: Optional[str] ) -> Optional[str]: @@ -211,8 +236,12 @@ async def list_session_messages(session_id: str) -> List[Dict[str, Any]]: ] -@router.post("", response_model=ChatResponse, dependencies=[Depends(get_identity)]) -async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]: +@router.post("", response_model=ChatResponse) +async def chat( + body: ChatRequest, + request: Request, + identity: AuthIdentity = Depends(get_identity), +) -> Dict[str, Any]: """RAG-augmented chat with NebulaOS assistant. 1. Retrieves relevant chunks from the nebula-system corpus (or provided corpus). @@ -239,6 +268,18 @@ async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]: db = get_db() model_provider = _get_model_provider(request) + bus = _get_event_bus(request) + user_id = getattr(identity, "user_id", None) + + # Emit chat channel event — chat is a first-class runtime channel + await _emit(bus, ChatEvents.message_sent( + event_id=f"evt_{cid}", + chat_session_id=body.session_id, + user_id=user_id, + correlation_id=cid, + message_len=len(body.message), + mode="nebula", + )) # Persist user message early (non-fatal) await _persist_message( @@ -328,6 +369,13 @@ async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]: "correlation_id": cid, "metadata": {"error": str(exc)}, }) + await _emit(bus, ChatEvents.reply_failed( + event_id=f"evt_{cid}_err", + chat_session_id=body.session_id, + error=str(exc), + user_id=user_id, + correlation_id=cid, + )) raise HTTPException( status_code=502, detail=f"LLM error: {exc}" ) from exc @@ -400,6 +448,18 @@ async def chat(body: ChatRequest, request: Request) -> Dict[str, Any]: meta={"chunks_used": chunks_used, "latency_ms": latency_ms, "model": model_used}, ) + # Emit reply event — allows runtime subscribers to react to chat replies + await _emit(bus, ChatEvents.message_replied( + event_id=f"evt_{cid}_reply", + chat_session_id=body.session_id, + user_id=user_id, + correlation_id=cid, + chunks_used=chunks_used, + latency_ms=latency_ms, + model_used=model_used, + mode="nebula", + )) + return { "reply": reply_text, "corpus_id": corpus_id, diff --git a/src/api/routers/corpora.py b/src/api/routers/corpora.py index c022d26c..ecbffe15 100644 --- a/src/api/routers/corpora.py +++ b/src/api/routers/corpora.py @@ -139,13 +139,17 @@ class QueryCorpusRequest(BaseModel): def _get_ingest_service(request: Request): - """Build an IngestService from the app container (best-effort).""" + """Build an IngestService from the app container (best-effort). + + embeddings_provider may be None when the rag_provider embeds internally + (e.g. AxiomRAGProvider) — IngestService handles that case. + """ from src.rag.ingest_service import IngestService container = getattr(request.app.state, "container", None) rag_provider = container.get("rag_provider") if container else None - embeddings_provider = container.get("embeddings_provider") if container else None - if rag_provider is None or embeddings_provider is None: + if rag_provider is None: return None + embeddings_provider = container.get("embeddings_provider") if container else None db = get_db() return IngestService( db=db, @@ -588,21 +592,41 @@ async def add_source( source_id, ) else: - log.info("source_ingest_skipped_no_provider", { + log.error("source_ingest_no_provider", { "component": "api.corpora", "operation": "add_source", "entity_id": source_id, "correlation_id": cid, "metadata": { - "reason": "rag_provider or embeddings_provider not registered", + "reason": "rag_provider not in app container", }, }) + await db.execute( + """ + UPDATE rag_sources + SET ingest_status = 'error', + error_msg = 'No RAG provider configured. ' + 'Check EMBEDDINGS_PROVIDER env var.' + WHERE source_id = $1 + """, + source_id, + ) + await db.execute( + """ + UPDATE rag_corpora + SET status = 'error', updated_at = NOW() + WHERE corpus_id = $1 + """, + corpus_id, + ) row = await db.fetchrow( "SELECT * FROM rag_sources WHERE source_id = $1", source_id ) + src_dict = dict(row) + src_dict["error_message"] = src_dict.pop("error_msg", None) response: Dict[str, Any] = { - "source": dict(row), + "source": src_dict, "ingest_triggered": body.source_type != "external_vector_store", } if url_classification: @@ -662,7 +686,13 @@ async def list_sources( """, corpus_id, ) - return {"sources": [dict(r) for r in rows], "total": len(rows)} + + def _fmt_source(r) -> dict: + d = dict(r) + d["error_message"] = d.pop("error_msg", None) + return d + + return {"sources": [_fmt_source(r) for r in rows], "total": len(rows)} @router.delete("/{corpus_id}/sources/{source_id}", status_code=204) @@ -885,9 +915,36 @@ async def reindex_corpus( src["source_id"], ) triggered += 1 + else: + # No ingest service available — mark corpus error so UI surfaces it + await db.execute( + """ + UPDATE rag_corpora + SET status = 'error', updated_at = NOW() + WHERE corpus_id = $1 + """, + corpus_id, + ) + await db.execute( + """ + UPDATE rag_sources + SET ingest_status = 'error', + error_msg = 'No RAG/embeddings provider configured. ' + 'Check EMBEDDINGS_PROVIDER env var.' + WHERE corpus_id = $1 + """, + corpus_id, + ) + log.error("corpus_reindex_no_provider", { + "component": "api.corpora", + "operation": "reindex_corpus", + "entity_id": corpus_id, + "correlation_id": cid, + "metadata": {"error": "no ingest service — rag_provider not in container"}, + }) return { "corpus_id": corpus_id, "sources_queued": triggered, - "status": "building", + "status": "building" if triggered > 0 else "error", } diff --git a/src/events/event_types.py b/src/events/event_types.py index faef739f..5648d540 100644 --- a/src/events/event_types.py +++ b/src/events/event_types.py @@ -966,3 +966,143 @@ class SandboxEvents: **extra_metadata, }, ) + + +# ── Chat channel events ─────────────────────────────────────────────────────── + + +class ChatEvents: + """Factory for CHAT-category NebulaEvents. + + Chat is treated as a first-class channel in the runtime — every + message and session transition emits a platform event so that: + - the event bus can subscribe and dispatch agents + - correlation chains connect chat → task → agent + - the UI can stream live chat activity + + Source convention: "chat.ui" for browser-originated messages. + """ + + @staticmethod + def session_opened( + event_id: str, + chat_session_id: str, + user_id: Optional[str] = None, + correlation_id: Optional[str] = None, + **extra_metadata, + ) -> "NebulaEvent": + return NebulaEvent( + event_id=event_id, + category=EventCategory.CHAT, + event_type="session_opened", + source="chat.ui", + chat_session_id=chat_session_id, + user_id=user_id, + correlation_id=correlation_id, + outcome=EventOutcome.SUCCESS, + tags=["chat", "session", "open"], + metadata=extra_metadata, + ) + + @staticmethod + def message_sent( + event_id: str, + chat_session_id: Optional[str], + user_id: Optional[str] = None, + correlation_id: Optional[str] = None, + message_len: int = 0, + mode: str = "nebula", + **extra_metadata, + ) -> "NebulaEvent": + return NebulaEvent( + event_id=event_id, + category=EventCategory.CHAT, + event_type="message_sent", + source="chat.ui", + chat_session_id=chat_session_id, + user_id=user_id, + correlation_id=correlation_id, + outcome=EventOutcome.PENDING, + tags=["chat", "message", mode], + metadata={ + "message_len": message_len, + "mode": mode, + **extra_metadata, + }, + ) + + @staticmethod + def message_replied( + event_id: str, + chat_session_id: Optional[str], + user_id: Optional[str] = None, + correlation_id: Optional[str] = None, + chunks_used: int = 0, + latency_ms: int = 0, + model_used: str = "", + mode: str = "nebula", + **extra_metadata, + ) -> "NebulaEvent": + return NebulaEvent( + event_id=event_id, + category=EventCategory.CHAT, + event_type="message_replied", + source="chat.ui", + chat_session_id=chat_session_id, + user_id=user_id, + correlation_id=correlation_id, + outcome=EventOutcome.SUCCESS, + duration_ms=latency_ms, + tags=["chat", "reply", mode], + metadata={ + "chunks_used": chunks_used, + "latency_ms": latency_ms, + "model_used": model_used, + "mode": mode, + **extra_metadata, + }, + ) + + @staticmethod + def reply_failed( + event_id: str, + chat_session_id: Optional[str], + error: str, + user_id: Optional[str] = None, + correlation_id: Optional[str] = None, + **extra_metadata, + ) -> "NebulaEvent": + return NebulaEvent( + event_id=event_id, + category=EventCategory.CHAT, + event_type="reply_failed", + source="chat.ui", + chat_session_id=chat_session_id, + user_id=user_id, + correlation_id=correlation_id, + outcome=EventOutcome.FAILURE, + severity=EventSeverity.ERROR, + tags=["chat", "error"], + metadata={"error": error, **extra_metadata}, + ) + + @staticmethod + def session_closed( + event_id: str, + chat_session_id: str, + user_id: Optional[str] = None, + correlation_id: Optional[str] = None, + **extra_metadata, + ) -> "NebulaEvent": + return NebulaEvent( + event_id=event_id, + category=EventCategory.CHAT, + event_type="session_closed", + source="chat.ui", + chat_session_id=chat_session_id, + user_id=user_id, + correlation_id=correlation_id, + outcome=EventOutcome.SUCCESS, + tags=["chat", "session", "close"], + metadata=extra_metadata, + ) diff --git a/webapp/src/components/layout/ChatWorkspace.tsx b/webapp/src/components/layout/ChatWorkspace.tsx index 14002542..280c610c 100644 --- a/webapp/src/components/layout/ChatWorkspace.tsx +++ b/webapp/src/components/layout/ChatWorkspace.tsx @@ -4,7 +4,7 @@ import { useNavigate } from 'react-router-dom' import { Send, Bot, Loader2, ExternalLink, AlertTriangle, RotateCcw, Plus, Sparkles, Zap, BookOpen, Hash, HelpCircle, - Plug, Cpu, Database, GitBranch, + Plug, Cpu, Database, GitBranch, MessageSquare, ChevronRight, CheckCircle2, XCircle, Clock, ListTree, } from 'lucide-react' import { @@ -821,7 +821,18 @@ export function ChatWorkspace() { Loading session… )} - {!loadingSession && messages.length === 0 && ( + {!loadingSession && messages.length === 0 && sessionId && ( +
+ +
+ No messages stored +
+
+ This session predates message persistence. Continue the conversation below. +
+
+ )} + {!loadingSession && messages.length === 0 && !sessionId && (
diff --git a/webapp/src/components/layout/Navbar.tsx b/webapp/src/components/layout/Navbar.tsx index fb54de11..bdfffbda 100644 --- a/webapp/src/components/layout/Navbar.tsx +++ b/webapp/src/components/layout/Navbar.tsx @@ -426,9 +426,6 @@ export function Navbar() { {activePopover === 'profile' && (
{profile.name}
-
- -
setActivePopover(null)} className="ne-Navbar__profile-dropdown-item"> Docs @@ -438,6 +435,10 @@ export function Navbar() { setActivePopover(null)} className="ne-Navbar__profile-dropdown-item"> Profile +
+
+ +
setActivePopover(null)}