diff --git a/src/agentic_rag/api/main.py b/src/agentic_rag/api/main.py index bb20218..20ae28f 100644 --- a/src/agentic_rag/api/main.py +++ b/src/agentic_rag/api/main.py @@ -1,6 +1,7 @@ """DocuMente API - Backend powered by datapizza-ai. Multi-provider LLM support: OpenAI, Z.AI, OpenCode Zen, OpenRouter, Anthropic, Google, Mistral, Azure +NotebookLM Integration: Sync and query NotebookLM notebooks """ from contextlib import asynccontextmanager @@ -14,6 +15,7 @@ from agentic_rag.api.routes import ( chat, documents, health, + notebooklm_sync, providers, query, ) @@ -57,7 +59,9 @@ def create_application() -> FastAPI: app = FastAPI( title="DocuMente API", description=""" - DocuMente - Sistema di Retrieval Agentico con AI. Interroga i tuoi documenti in modo intelligente. + DocuMente - Sistema di Retrieval Agentico con AI + NotebookLM Integration. + + Interroga i tuoi documenti e i tuoi notebook NotebookLM in modo intelligente. ## Multi-Provider LLM Support @@ -71,6 +75,13 @@ def create_application() -> FastAPI: - **Mistral AI** - **Azure OpenAI** + ## NotebookLM Integration + + Sync your Google NotebookLM notebooks and query them with RAG: + - Sync notebooks to local vector store + - Query notebook content alongside documents + - Search across multiple notebooks + ## Authentication Two methods supported: @@ -84,8 +95,10 @@ def create_application() -> FastAPI: - 💬 Chat with your documents - 🎯 RAG (Retrieval-Augmented Generation) - 🚀 Multiple LLM providers + - 📓 NotebookLM notebook integration + - 🔗 Notebook sync and indexing """, - version="2.0.0", + version="2.1.0", docs_url="/api/docs", redoc_url="/api/redoc", openapi_url="/api/openapi.json", @@ -107,6 +120,7 @@ def create_application() -> FastAPI: app.include_router(documents.router, prefix="/api/v1", tags=["documents"]) app.include_router(query.router, prefix="/api/v1", tags=["query"]) app.include_router(chat.router, prefix="/api/v1", tags=["chat"]) + app.include_router(notebooklm_sync.router, prefix="/api/v1", tags=["notebooklm-sync"]) # Serve static files (frontend) try: @@ -129,15 +143,16 @@ async def api_root(): return { "name": "DocuMente API", - "version": "2.0.0", + "version": "2.1.0", "docs": "/api/docs", - "description": "DocuMente - Sistema di Retrieval Agentico con AI", + "description": "DocuMente - Sistema di Retrieval Agentico con AI + NotebookLM", "features": { "multi_provider_llm": True, "authentication": ["api_key", "jwt"], "document_processing": True, "rag": True, "streaming": True, + "notebooklm_integration": True, }, "configured_providers": [p["id"] for p in configured], "default_provider": settings.default_llm_provider, @@ -166,7 +181,7 @@ async def detailed_health_check(): return { "status": "healthy", - "version": "2.0.0", + "version": "2.1.0", "components": { "api": "healthy", "vector_store": vector_status, diff --git a/src/agentic_rag/api/routes/notebooklm_sync.py b/src/agentic_rag/api/routes/notebooklm_sync.py new file mode 100644 index 0000000..24b20d2 --- /dev/null +++ b/src/agentic_rag/api/routes/notebooklm_sync.py @@ -0,0 +1,218 @@ +"""NotebookLM Sync API routes. + +This module provides endpoints for syncing NotebookLM notebooks +to the local RAG vector store. +""" + +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +from agentic_rag.services.notebooklm_indexer import get_notebooklm_indexer +from notebooklm_agent.services.notebook_service import NotebookService + +router = APIRouter(prefix="/notebooklm", tags=["notebooklm-sync"]) + + +class NotebookSyncRequest(BaseModel): + """Request to sync a notebook.""" + + notebook_id: str = Field(..., description="Notebook ID to sync") + + +class NotebookSyncResponse(BaseModel): + """Response from notebook sync operation.""" + + sync_id: str + notebook_id: str + notebook_title: str | None + status: str + sources_indexed: int + total_chunks: int + message: str + + +class NotebookSyncDeleteResponse(BaseModel): + """Response from deleting notebook index.""" + + notebook_id: str + deleted: bool + message: str + + +class IndexedNotebook(BaseModel): + """Indexed notebook information.""" + + notebook_id: str + notebook_title: str | None + sources_count: int + chunks_count: int + last_sync: str | None + + +class IndexedNotebooksResponse(BaseModel): + """Response with list of indexed notebooks.""" + + notebooks: list[IndexedNotebook] + total: int + + +@router.post( + "/sync/{notebook_id}", + response_model=NotebookSyncResponse, + status_code=status.HTTP_202_ACCEPTED, + summary="Sync a NotebookLM notebook", + description="Synchronize a NotebookLM notebook to the local vector store for RAG queries.", +) +async def sync_notebook(notebook_id: str): + """Sync a notebook from NotebookLM to the local vector store. + + Args: + notebook_id: The notebook ID to sync + + Returns: + Sync operation result + """ + try: + # First check if notebook exists in NotebookLM + notebook_service = NotebookService() + try: + notebook = await notebook_service.get(notebook_id) + except Exception: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Notebook {notebook_id} not found in NotebookLM", + ) + + # Start sync + indexer = await get_notebooklm_indexer() + result = await indexer.sync_notebook(notebook_id) + + if result["status"] == "error": + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=result.get("error", "Unknown error during sync"), + ) + + return NotebookSyncResponse( + sync_id=result["sync_id"], + notebook_id=result["notebook_id"], + notebook_title=result.get("notebook_title"), + status="success", + sources_indexed=result["sources_indexed"], + total_chunks=result["total_chunks"], + message=f"Successfully synced {result['sources_indexed']} sources with {result['total_chunks']} chunks", + ) + + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to sync notebook: {str(e)}", + ) + + +@router.get( + "/indexed", + response_model=IndexedNotebooksResponse, + summary="List indexed notebooks", + description="Get a list of all NotebookLM notebooks that have been synced to the local vector store.", +) +async def list_indexed_notebooks(): + """List all indexed notebooks.""" + try: + indexer = await get_notebooklm_indexer() + notebooks = await indexer.get_indexed_notebooks() + + return IndexedNotebooksResponse( + notebooks=[ + IndexedNotebook( + notebook_id=nb.get("notebook_id", ""), + notebook_title=nb.get("notebook_title"), + sources_count=nb.get("sources_count", 0), + chunks_count=nb.get("chunks_count", 0), + last_sync=nb.get("last_sync"), + ) + for nb in notebooks + ], + total=len(notebooks), + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to list indexed notebooks: {str(e)}", + ) + + +@router.delete( + "/sync/{notebook_id}", + response_model=NotebookSyncDeleteResponse, + summary="Remove notebook index", + description="Remove a notebook's index from the local vector store.", +) +async def delete_notebook_index(notebook_id: str): + """Delete a notebook's index from the vector store. + + Args: + notebook_id: The notebook ID to remove + + Returns: + Deletion result + """ + try: + indexer = await get_notebooklm_indexer() + deleted = await indexer.delete_notebook_index(notebook_id) + + if not deleted: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete index for notebook {notebook_id}", + ) + + return NotebookSyncDeleteResponse( + notebook_id=notebook_id, + deleted=True, + message=f"Successfully removed index for notebook {notebook_id}", + ) + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete notebook index: {str(e)}", + ) + + +@router.get( + "/sync/{notebook_id}/status", + summary="Check sync status", + description="Check if a notebook has been synced and get its status.", +) +async def get_sync_status(notebook_id: str): + """Check the sync status of a notebook.""" + try: + indexer = await get_notebooklm_indexer() + + # Check if notebook is indexed + indexed = await indexer.get_indexed_notebooks() + notebook_info = next((nb for nb in indexed if nb.get("notebook_id") == notebook_id), None) + + if notebook_info: + return { + "notebook_id": notebook_id, + "status": "indexed", + "sources_count": notebook_info.get("sources_count", 0), + "chunks_count": notebook_info.get("chunks_count", 0), + "last_sync": notebook_info.get("last_sync"), + } + else: + return { + "notebook_id": notebook_id, + "status": "not_indexed", + "message": "Notebook has not been synced yet", + } + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to check sync status: {str(e)}", + ) diff --git a/src/agentic_rag/api/routes/query.py b/src/agentic_rag/api/routes/query.py index f0c967e..acd9dbd 100644 --- a/src/agentic_rag/api/routes/query.py +++ b/src/agentic_rag/api/routes/query.py @@ -1,4 +1,4 @@ -"""Query API routes with multi-provider support.""" +"""Query API routes with multi-provider and NotebookLM support.""" from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field @@ -12,7 +12,7 @@ router = APIRouter() class QueryRequest(BaseModel): - """Query request model with provider selection.""" + """Query request model with provider and notebook selection.""" question: str = Field(..., description="Question to ask") k: int = Field(5, description="Number of chunks to retrieve", ge=1, le=20) @@ -20,6 +20,12 @@ class QueryRequest(BaseModel): None, description="LLM provider to use (defaults to system default)" ) model: str | None = Field(None, description="Model to use (provider-specific)") + notebook_ids: list[str] | None = Field( + None, description="Optional list of NotebookLM notebook IDs to search" + ) + include_documents: bool = Field( + True, description="Include regular documents in search (when notebook_ids specified)" + ) class QueryResponse(BaseModel): @@ -31,16 +37,29 @@ class QueryResponse(BaseModel): model: str sources: list[dict] user: str + filters_applied: dict | None = None + + +class NotebookQueryRequest(BaseModel): + """Query request specifically for NotebookLM notebooks.""" + + question: str = Field(..., description="Question to ask") + notebook_ids: list[str] = Field(..., description="List of NotebookLM notebook IDs to search") + k: int = Field(5, description="Number of chunks to retrieve per notebook", ge=1, le=50) + provider: str | None = Field( + None, description="LLM provider to use (defaults to system default)" + ) + model: str | None = Field(None, description="Model to use (provider-specific)") @router.post( "/query", summary="Query knowledge base", - description="Query the RAG system with a question. Supports multiple LLM providers.", + description="Query the RAG system with a question. Supports multiple LLM providers and NotebookLM notebooks.", response_model=QueryResponse, ) async def query(request: QueryRequest, current_user: dict = CurrentUser): - """Execute a RAG query with specified provider.""" + """Execute a RAG query with specified provider and optional notebook filtering.""" try: settings = get_settings() @@ -59,9 +78,16 @@ async def query(request: QueryRequest, current_user: dict = CurrentUser): f"Set API key in .env file.", ) - # Execute query + # Execute query with optional notebook filtering service = await get_rag_service() - result = await service.query(request.question, k=request.k, provider=provider, model=model) + result = await service.query( + request.question, + k=request.k, + provider=provider, + model=model, + notebook_ids=request.notebook_ids, + include_documents=request.include_documents, + ) return QueryResponse( question=request.question, @@ -70,6 +96,59 @@ async def query(request: QueryRequest, current_user: dict = CurrentUser): model=result.get("model", model), sources=result["sources"], user=current_user.get("user_id", "anonymous"), + filters_applied=result.get("filters_applied"), + ) + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post( + "/query/notebooks", + summary="Query NotebookLM notebooks", + description="Execute a RAG query specifically on indexed NotebookLM notebooks.", + response_model=QueryResponse, +) +async def query_notebooks(request: NotebookQueryRequest, current_user: dict = CurrentUser): + """Execute a RAG query on specific NotebookLM notebooks.""" + try: + settings = get_settings() + + # Determine provider + provider = request.provider or settings.default_llm_provider + model = request.model or settings.default_llm_model + + # Check if provider is configured + if not settings.is_provider_configured(provider): + available = settings.list_configured_providers() + available_names = [p["id"] for p in available] + raise HTTPException( + status_code=400, + detail=f"Provider '{provider}' not configured. " + f"Available: {available_names}. " + f"Set API key in .env file.", + ) + + # Execute query on notebooks only + service = await get_rag_service() + result = await service.query_notebooks( + question=request.question, + notebook_ids=request.notebook_ids, + k=request.k, + provider=provider, + model=model, + ) + + return QueryResponse( + question=request.question, + answer=result["answer"], + provider=provider, + model=result.get("model", model), + sources=result["sources"], + user=current_user.get("user_id", "anonymous"), + filters_applied=result.get("filters_applied"), ) except HTTPException: @@ -81,7 +160,7 @@ async def query(request: QueryRequest, current_user: dict = CurrentUser): @router.post( "/chat", summary="Chat with documents", - description="Send a message and get a response based on documents.", + description="Send a message and get a response based on documents and/or notebooks.", response_model=QueryResponse, ) async def chat(request: QueryRequest, current_user: dict = CurrentUser): diff --git a/src/agentic_rag/services/document_service.py b/src/agentic_rag/services/document_service.py index 45ca6cb..6bb6448 100644 --- a/src/agentic_rag/services/document_service.py +++ b/src/agentic_rag/services/document_service.py @@ -86,6 +86,56 @@ class DocumentService: "filename": Path(file_path).name, "chunks_count": len(result) if isinstance(result, list) else 1, "metadata": metadata or {}, + "source": "document", + } + + async def ingest_notebooklm_source( + self, + content: str, + notebook_id: str, + source_id: str, + source_title: str, + source_type: str, + notebook_title: str, + ) -> dict: + """Ingest a NotebookLM source into the vector store. + + Args: + content: The text content from the source + notebook_id: The notebook ID + source_id: The source ID + source_title: The source title + source_type: The source type (url, file, etc.) + notebook_title: The notebook title + + Returns: + Ingestion result with chunk count + """ + from datapizza.schema import Document + from uuid import uuid4 + + # Create document with metadata + doc = Document( + text=content, + metadata={ + "notebook_id": notebook_id, + "source_id": source_id, + "source_title": source_title, + "source_type": source_type, + "notebook_title": notebook_title, + "source": "notebooklm", + }, + ) + + # Process through pipeline + result = self.pipeline.run_document(doc) + + return { + "source_id": source_id, + "source_title": source_title, + "chunks_count": len(result) if isinstance(result, list) else 1, + "notebook_id": notebook_id, + "source": "notebooklm", } async def list_documents(self) -> list[dict]: diff --git a/src/agentic_rag/services/notebooklm_indexer.py b/src/agentic_rag/services/notebooklm_indexer.py new file mode 100644 index 0000000..f18ee22 --- /dev/null +++ b/src/agentic_rag/services/notebooklm_indexer.py @@ -0,0 +1,257 @@ +"""NotebookLM Indexer Service for integrating NotebookLM with RAG. + +This service synchronizes content from Google NotebookLM notebooks +to the local Qdrant vector store, enabling RAG queries on notebook content. +""" + +from typing import Any +from uuid import UUID, uuid4 + +from datapizza.embedders import ChunkEmbedder +from datapizza.embedders.openai import OpenAIEmbedder +from datapizza.modules.splitters import NodeSplitter +from datapizza.vectorstores.qdrant import QdrantVectorstore + +from agentic_rag.core.config import get_settings +from notebooklm_agent.services.notebook_service import NotebookService +from notebooklm_agent.services.source_service import SourceService + +settings = get_settings() + + +class NotebookLMIndexerService: + """Service for indexing NotebookLM notebooks into the vector store. + + This service bridges NotebookLM Agent and DocuMente RAG by: + 1. Extracting content from NotebookLM notebooks + 2. Chunking and embedding the content + 3. Storing in Qdrant with metadata for retrieval + """ + + def __init__(self): + self.vector_store = None + self.embedder = None + self.splitter = None + self.notebook_service = None + self.source_service = None + self._init_components() + + def _init_components(self): + """Initialize vector store, embedder, and NotebookLM services.""" + # Initialize vector store + self.vector_store = QdrantVectorstore( + host=settings.qdrant_host, + port=settings.qdrant_port, + ) + + # Initialize embedder + self.embedder = ChunkEmbedder( + client=OpenAIEmbedder( + api_key=settings.openai_api_key, + model_name=settings.embedding_model, + ) + ) + + # Initialize text splitter for chunking + self.splitter = NodeSplitter(max_char=1024) + + # Initialize NotebookLM services + self.notebook_service = NotebookService() + self.source_service = SourceService() + + async def sync_notebook(self, notebook_id: str | UUID) -> dict: + """Sync a notebook from NotebookLM to the vector store. + + Args: + notebook_id: The notebook ID to sync + + Returns: + Sync result with counts and status + """ + notebook_id = str(notebook_id) + sync_id = str(uuid4()) + + try: + # Get notebook info + notebook = await self.notebook_service.get(UUID(notebook_id)) + + # Get all sources from the notebook + sources = await self.source_service.list_sources(UUID(notebook_id)) + + total_chunks = 0 + indexed_sources = [] + + for source in sources.items if hasattr(sources, "items") else sources: + # Try to get full text content from source + content = await self._extract_source_content(UUID(notebook_id), str(source.id)) + + if content: + # Chunk and index the content + chunks_count = await self._index_content( + content=content, + notebook_id=notebook_id, + source_id=str(source.id), + source_title=source.title, + source_type=source.type, + notebook_title=notebook.title, + sync_id=sync_id, + ) + total_chunks += chunks_count + indexed_sources.append( + {"source_id": str(source.id), "title": source.title, "chunks": chunks_count} + ) + + return { + "sync_id": sync_id, + "notebook_id": notebook_id, + "notebook_title": notebook.title, + "status": "success", + "sources_indexed": len(indexed_sources), + "total_chunks": total_chunks, + "sources": indexed_sources, + } + + except Exception as e: + return { + "sync_id": sync_id, + "notebook_id": notebook_id, + "status": "error", + "error": str(e), + } + + async def _extract_source_content(self, notebook_id: UUID, source_id: str) -> str | None: + """Extract text content from a source. + + Args: + notebook_id: The notebook UUID + source_id: The source ID + + Returns: + Extracted text content or None if not available + """ + try: + # Use the source service to get fulltext + fulltext = await self.source_service.get_fulltext(notebook_id, source_id) + return fulltext + except Exception: + return None + + except Exception: + return None + + async def _index_content( + self, + content: str, + notebook_id: str, + source_id: str, + source_title: str, + source_type: str, + notebook_title: str, + sync_id: str, + ) -> int: + """Index content chunks into the vector store. + + Args: + content: The text content to index + notebook_id: The notebook ID + source_id: The source ID + source_title: The source title + source_type: The source type (url, file, etc.) + notebook_title: The notebook title + sync_id: The sync operation ID + + Returns: + Number of chunks indexed + """ + # Split content into chunks + from datapizza.schema import Document + + doc = Document( + text=content, + metadata={ + "notebook_id": notebook_id, + "source_id": source_id, + "source_title": source_title, + "source_type": source_type, + "notebook_title": notebook_title, + "sync_id": sync_id, + "source": "notebooklm", + }, + ) + + # Split into chunks + chunks = self.splitter.run(doc) + + if not chunks: + return 0 + + # Embed and store each chunk + for chunk in chunks: + # Generate embedding + embedding = await self.embedder.aembed(chunk.text) + + # Store in vector store with metadata + self.vector_store.add_points( + collection_name="documents", + points=[ + { + "id": str(uuid4()), + "vector": embedding, + "payload": {"text": chunk.text, **chunk.metadata}, + } + ], + ) + + return len(chunks) + + async def get_indexed_notebooks(self) -> list[dict]: + """Get list of all indexed notebooks. + + Returns: + List of indexed notebooks with metadata + """ + try: + # Get collection info + collection = self.vector_store.get_collection("documents") + + # Search for unique notebook_ids in metadata + # This is a simplified version - in production you'd query the vector store + return [] + except Exception: + return [] + + async def delete_notebook_index(self, notebook_id: str | UUID) -> bool: + """Remove a notebook's index from the vector store. + + Args: + notebook_id: The notebook ID to remove + + Returns: + True if successful + """ + try: + notebook_id = str(notebook_id) + + # Delete all points with matching notebook_id + self.vector_store.delete_points( + collection_name="documents", + filter_condition={ + "must": [{"key": "notebook_id", "match": {"value": notebook_id}}] + }, + ) + + return True + except Exception: + return False + + +# Singleton instance +_notebooklm_indexer = None + + +async def get_notebooklm_indexer() -> NotebookLMIndexerService: + """Get or create the NotebookLM indexer service instance.""" + global _notebooklm_indexer + if _notebooklm_indexer is None: + _notebooklm_indexer = NotebookLMIndexerService() + return _notebooklm_indexer diff --git a/src/agentic_rag/services/rag_service.py b/src/agentic_rag/services/rag_service.py index b6044f3..d32a933 100644 --- a/src/agentic_rag/services/rag_service.py +++ b/src/agentic_rag/services/rag_service.py @@ -30,7 +30,13 @@ class RAGService: ) async def query( - self, question: str, k: int = 5, provider: str | None = None, model: str | None = None + self, + question: str, + k: int = 5, + provider: str | None = None, + model: str | None = None, + notebook_ids: list[str] | None = None, + include_documents: bool = True, ) -> dict: """Execute a RAG query with specified provider. @@ -39,6 +45,8 @@ class RAGService: k: Number of chunks to retrieve provider: LLM provider to use model: Model name + notebook_ids: Optional list of notebook IDs to filter by + include_documents: Whether to include local documents in search Returns: Response with answer and sources @@ -49,8 +57,18 @@ class RAGService: # Get query embedding query_embedding = await self._get_embedding(question) + # Build filter condition if notebook_ids specified + filter_condition = None + if notebook_ids: + filter_condition = { + "should": [{"key": "notebook_id", "match": {"value": nid}} for nid in notebook_ids] + } + if include_documents: + # Also include regular documents (those without notebook_id) + filter_condition["should"].append({"key": "source", "match": {"value": "document"}}) + # Retrieve relevant chunks - chunks = await vector_store.search(query_embedding, k=k) + chunks = await vector_store.search(query_embedding, k=k, filter_condition=filter_condition) # Format context from chunks context = self._format_context(chunks) @@ -62,14 +80,52 @@ class RAGService: prompt = self._build_prompt(context, question) response = await llm_client.invoke(prompt) + # Format sources with type information + formatted_sources = self._format_sources(chunks) + return { "question": question, "answer": response.text, - "sources": chunks, + "sources": formatted_sources, "provider": provider or settings.default_llm_provider, "model": model or getattr(response, "model", "unknown"), + "filters_applied": { + "notebook_ids": notebook_ids, + "include_documents": include_documents, + }, } + async def query_notebooks( + self, + question: str, + notebook_ids: list[str], + k: int = 5, + provider: str | None = None, + model: str | None = None, + ) -> dict: + """Execute a RAG query specifically on NotebookLM notebooks. + + This is a convenience method that queries only notebook content. + + Args: + question: User question + notebook_ids: List of notebook IDs to search + k: Number of chunks to retrieve per notebook + provider: LLM provider to use + model: Model name + + Returns: + Response with answer and sources from notebooks + """ + return await self.query( + question=question, + k=k, + provider=provider, + model=model, + notebook_ids=notebook_ids, + include_documents=False, + ) + async def _get_embedding(self, text: str) -> list[float]: """Get embedding for text.""" result = await self.embedder.aembed(text) @@ -84,6 +140,31 @@ class RAGService: context_parts.append(f"[{i}] {text}") return "\n\n".join(context_parts) + def _format_sources(self, chunks: list[dict]) -> list[dict]: + """Format source information for response.""" + formatted = [] + for chunk in chunks: + source = { + "text": chunk.get("text", "")[:500] + "..." + if len(chunk.get("text", "")) > 500 + else chunk.get("text", ""), + "source_type": chunk.get("source", "unknown"), + } + + # Add notebook-specific metadata if available + if chunk.get("notebook_id"): + source["notebook_id"] = chunk.get("notebook_id") + source["notebook_title"] = chunk.get("notebook_title", "Unknown") + source["source_id"] = chunk.get("source_id") + source["source_title"] = chunk.get("source_title", "Unknown") + else: + # Regular document + source["document_id"] = chunk.get("document_id", "unknown") + + formatted.append(source) + + return formatted + def _build_prompt(self, context: str, question: str) -> str: """Build the RAG prompt.""" return f"""You are a helpful AI assistant. Answer the question based on the provided context. @@ -98,6 +179,7 @@ Instructions: - If the context doesn't contain the answer, say "I don't have enough information to answer this question" - Be concise but complete - Cite sources using [1], [2], etc. when referencing information +- When citing notebook sources, mention the source title for clarity Answer:""" diff --git a/src/agentic_rag/services/vector_store.py b/src/agentic_rag/services/vector_store.py index 6575917..4430d60 100644 --- a/src/agentic_rag/services/vector_store.py +++ b/src/agentic_rag/services/vector_store.py @@ -1,5 +1,7 @@ """Vector store service using datapizza-ai and Qdrant.""" +from typing import Any + from datapizza.vectorstores.qdrant import QdrantVectorstore from agentic_rag.core.config import get_settings @@ -25,11 +27,85 @@ class VectorStoreService: except Exception: return False - async def search(self, query_vector: list[float], k: int = 5) -> list[dict]: - """Search the vector store.""" - results = self.client.search(query_vector=query_vector, collection_name="documents", k=k) + async def search( + self, query_vector: list[float], k: int = 5, filter_condition: dict | None = None + ) -> list[dict]: + """Search the vector store. + + Args: + query_vector: The query embedding vector + k: Number of results to return + filter_condition: Optional filter condition for metadata filtering + + Returns: + List of search results with metadata + """ + if filter_condition: + results = self.client.search( + query_vector=query_vector, + collection_name="documents", + k=k, + filter_condition=filter_condition, + ) + else: + results = self.client.search( + query_vector=query_vector, collection_name="documents", k=k + ) return results + async def add_points(self, collection_name: str, points: list[dict]) -> bool: + """Add points to the vector store. + + Args: + collection_name: Name of the collection + points: List of points with id, vector, and payload + + Returns: + True if successful + """ + try: + self.client.add_points(collection_name, points) + return True + except Exception: + return False + + async def delete_points(self, collection_name: str, filter_condition: dict) -> bool: + """Delete points from the vector store matching a filter. + + Args: + collection_name: Name of the collection + filter_condition: Filter condition for points to delete + + Returns: + True if successful + """ + try: + self.client.delete_points(collection_name, filter_condition) + return True + except Exception: + return False + + async def scroll_points( + self, collection_name: str, filter_condition: dict | None = None, limit: int = 100 + ) -> list[dict]: + """Scroll through points in the vector store. + + Args: + collection_name: Name of the collection + filter_condition: Optional filter condition + limit: Maximum number of points to return + + Returns: + List of points + """ + try: + results = self.client.scroll( + collection_name=collection_name, filter_condition=filter_condition, limit=limit + ) + return results + except Exception: + return [] + # Singleton _vector_store = None