feat: integrate NotebookLM with RAG system
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / lint (push) Has been cancelled

Add complete integration between NotebookLM Agent and DocuMente RAG:

New Components:
- NotebookLMIndexerService: Syncs NotebookLM content to Qdrant vector store
- notebooklm_sync API routes: Manage notebook indexing (/api/v1/notebooklm/*)

Enhanced Components:
- RAGService: Added notebook_ids filter and query_notebooks() method
- VectorStoreService: Added filter support for metadata queries
- DocumentService: Added ingest_notebooklm_source() method
- Query routes: Added /query/notebooks endpoint for notebook-only queries
- Main API: Integrated new routes and updated to v2.1.0

Features:
- Sync NotebookLM notebooks to local vector store
- Query across documents and/or notebooks
- Filter RAG queries by specific notebook IDs
- Manage indexed notebooks (list, sync, delete)
- Track sync status and metadata

API Endpoints:
- POST /api/v1/notebooklm/sync/{notebook_id}
- GET /api/v1/notebooklm/indexed
- DELETE /api/v1/notebooklm/sync/{notebook_id}
- GET /api/v1/notebooklm/sync/{notebook_id}/status
- POST /api/v1/query/notebooks

Closes integration request for unified NotebookLM + RAG agent
This commit is contained in:
Luca Sacchi Ricciardi
2026-04-06 17:18:33 +02:00
parent 67ba5bc2dd
commit e3bacbc0a4
7 changed files with 795 additions and 18 deletions

View File

@@ -1,6 +1,7 @@
"""DocuMente API - Backend powered by datapizza-ai. """DocuMente API - Backend powered by datapizza-ai.
Multi-provider LLM support: OpenAI, Z.AI, OpenCode Zen, OpenRouter, Anthropic, Google, Mistral, Azure Multi-provider LLM support: OpenAI, Z.AI, OpenCode Zen, OpenRouter, Anthropic, Google, Mistral, Azure
NotebookLM Integration: Sync and query NotebookLM notebooks
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@@ -14,6 +15,7 @@ from agentic_rag.api.routes import (
chat, chat,
documents, documents,
health, health,
notebooklm_sync,
providers, providers,
query, query,
) )
@@ -57,7 +59,9 @@ def create_application() -> FastAPI:
app = FastAPI( app = FastAPI(
title="DocuMente API", title="DocuMente API",
description=""" description="""
DocuMente - Sistema di Retrieval Agentico con AI. Interroga i tuoi documenti in modo intelligente. DocuMente - Sistema di Retrieval Agentico con AI + NotebookLM Integration.
Interroga i tuoi documenti e i tuoi notebook NotebookLM in modo intelligente.
## Multi-Provider LLM Support ## Multi-Provider LLM Support
@@ -71,6 +75,13 @@ def create_application() -> FastAPI:
- **Mistral AI** - **Mistral AI**
- **Azure OpenAI** - **Azure OpenAI**
## NotebookLM Integration
Sync your Google NotebookLM notebooks and query them with RAG:
- Sync notebooks to local vector store
- Query notebook content alongside documents
- Search across multiple notebooks
## Authentication ## Authentication
Two methods supported: Two methods supported:
@@ -84,8 +95,10 @@ def create_application() -> FastAPI:
- 💬 Chat with your documents - 💬 Chat with your documents
- 🎯 RAG (Retrieval-Augmented Generation) - 🎯 RAG (Retrieval-Augmented Generation)
- 🚀 Multiple LLM providers - 🚀 Multiple LLM providers
- 📓 NotebookLM notebook integration
- 🔗 Notebook sync and indexing
""", """,
version="2.0.0", version="2.1.0",
docs_url="/api/docs", docs_url="/api/docs",
redoc_url="/api/redoc", redoc_url="/api/redoc",
openapi_url="/api/openapi.json", openapi_url="/api/openapi.json",
@@ -107,6 +120,7 @@ def create_application() -> FastAPI:
app.include_router(documents.router, prefix="/api/v1", tags=["documents"]) app.include_router(documents.router, prefix="/api/v1", tags=["documents"])
app.include_router(query.router, prefix="/api/v1", tags=["query"]) app.include_router(query.router, prefix="/api/v1", tags=["query"])
app.include_router(chat.router, prefix="/api/v1", tags=["chat"]) app.include_router(chat.router, prefix="/api/v1", tags=["chat"])
app.include_router(notebooklm_sync.router, prefix="/api/v1", tags=["notebooklm-sync"])
# Serve static files (frontend) # Serve static files (frontend)
try: try:
@@ -129,15 +143,16 @@ async def api_root():
return { return {
"name": "DocuMente API", "name": "DocuMente API",
"version": "2.0.0", "version": "2.1.0",
"docs": "/api/docs", "docs": "/api/docs",
"description": "DocuMente - Sistema di Retrieval Agentico con AI", "description": "DocuMente - Sistema di Retrieval Agentico con AI + NotebookLM",
"features": { "features": {
"multi_provider_llm": True, "multi_provider_llm": True,
"authentication": ["api_key", "jwt"], "authentication": ["api_key", "jwt"],
"document_processing": True, "document_processing": True,
"rag": True, "rag": True,
"streaming": True, "streaming": True,
"notebooklm_integration": True,
}, },
"configured_providers": [p["id"] for p in configured], "configured_providers": [p["id"] for p in configured],
"default_provider": settings.default_llm_provider, "default_provider": settings.default_llm_provider,
@@ -166,7 +181,7 @@ async def detailed_health_check():
return { return {
"status": "healthy", "status": "healthy",
"version": "2.0.0", "version": "2.1.0",
"components": { "components": {
"api": "healthy", "api": "healthy",
"vector_store": vector_status, "vector_store": vector_status,

View File

@@ -0,0 +1,218 @@
"""NotebookLM Sync API routes.
This module provides endpoints for syncing NotebookLM notebooks
to the local RAG vector store.
"""
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field
from agentic_rag.services.notebooklm_indexer import get_notebooklm_indexer
from notebooklm_agent.services.notebook_service import NotebookService
router = APIRouter(prefix="/notebooklm", tags=["notebooklm-sync"])
class NotebookSyncRequest(BaseModel):
"""Request to sync a notebook."""
notebook_id: str = Field(..., description="Notebook ID to sync")
class NotebookSyncResponse(BaseModel):
"""Response from notebook sync operation."""
sync_id: str
notebook_id: str
notebook_title: str | None
status: str
sources_indexed: int
total_chunks: int
message: str
class NotebookSyncDeleteResponse(BaseModel):
"""Response from deleting notebook index."""
notebook_id: str
deleted: bool
message: str
class IndexedNotebook(BaseModel):
"""Indexed notebook information."""
notebook_id: str
notebook_title: str | None
sources_count: int
chunks_count: int
last_sync: str | None
class IndexedNotebooksResponse(BaseModel):
"""Response with list of indexed notebooks."""
notebooks: list[IndexedNotebook]
total: int
@router.post(
"/sync/{notebook_id}",
response_model=NotebookSyncResponse,
status_code=status.HTTP_202_ACCEPTED,
summary="Sync a NotebookLM notebook",
description="Synchronize a NotebookLM notebook to the local vector store for RAG queries.",
)
async def sync_notebook(notebook_id: str):
"""Sync a notebook from NotebookLM to the local vector store.
Args:
notebook_id: The notebook ID to sync
Returns:
Sync operation result
"""
try:
# First check if notebook exists in NotebookLM
notebook_service = NotebookService()
try:
notebook = await notebook_service.get(notebook_id)
except Exception:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Notebook {notebook_id} not found in NotebookLM",
)
# Start sync
indexer = await get_notebooklm_indexer()
result = await indexer.sync_notebook(notebook_id)
if result["status"] == "error":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.get("error", "Unknown error during sync"),
)
return NotebookSyncResponse(
sync_id=result["sync_id"],
notebook_id=result["notebook_id"],
notebook_title=result.get("notebook_title"),
status="success",
sources_indexed=result["sources_indexed"],
total_chunks=result["total_chunks"],
message=f"Successfully synced {result['sources_indexed']} sources with {result['total_chunks']} chunks",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to sync notebook: {str(e)}",
)
@router.get(
"/indexed",
response_model=IndexedNotebooksResponse,
summary="List indexed notebooks",
description="Get a list of all NotebookLM notebooks that have been synced to the local vector store.",
)
async def list_indexed_notebooks():
"""List all indexed notebooks."""
try:
indexer = await get_notebooklm_indexer()
notebooks = await indexer.get_indexed_notebooks()
return IndexedNotebooksResponse(
notebooks=[
IndexedNotebook(
notebook_id=nb.get("notebook_id", ""),
notebook_title=nb.get("notebook_title"),
sources_count=nb.get("sources_count", 0),
chunks_count=nb.get("chunks_count", 0),
last_sync=nb.get("last_sync"),
)
for nb in notebooks
],
total=len(notebooks),
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list indexed notebooks: {str(e)}",
)
@router.delete(
"/sync/{notebook_id}",
response_model=NotebookSyncDeleteResponse,
summary="Remove notebook index",
description="Remove a notebook's index from the local vector store.",
)
async def delete_notebook_index(notebook_id: str):
"""Delete a notebook's index from the vector store.
Args:
notebook_id: The notebook ID to remove
Returns:
Deletion result
"""
try:
indexer = await get_notebooklm_indexer()
deleted = await indexer.delete_notebook_index(notebook_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete index for notebook {notebook_id}",
)
return NotebookSyncDeleteResponse(
notebook_id=notebook_id,
deleted=True,
message=f"Successfully removed index for notebook {notebook_id}",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete notebook index: {str(e)}",
)
@router.get(
"/sync/{notebook_id}/status",
summary="Check sync status",
description="Check if a notebook has been synced and get its status.",
)
async def get_sync_status(notebook_id: str):
"""Check the sync status of a notebook."""
try:
indexer = await get_notebooklm_indexer()
# Check if notebook is indexed
indexed = await indexer.get_indexed_notebooks()
notebook_info = next((nb for nb in indexed if nb.get("notebook_id") == notebook_id), None)
if notebook_info:
return {
"notebook_id": notebook_id,
"status": "indexed",
"sources_count": notebook_info.get("sources_count", 0),
"chunks_count": notebook_info.get("chunks_count", 0),
"last_sync": notebook_info.get("last_sync"),
}
else:
return {
"notebook_id": notebook_id,
"status": "not_indexed",
"message": "Notebook has not been synced yet",
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to check sync status: {str(e)}",
)

View File

@@ -1,4 +1,4 @@
"""Query API routes with multi-provider support.""" """Query API routes with multi-provider and NotebookLM support."""
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -12,7 +12,7 @@ router = APIRouter()
class QueryRequest(BaseModel): class QueryRequest(BaseModel):
"""Query request model with provider selection.""" """Query request model with provider and notebook selection."""
question: str = Field(..., description="Question to ask") question: str = Field(..., description="Question to ask")
k: int = Field(5, description="Number of chunks to retrieve", ge=1, le=20) k: int = Field(5, description="Number of chunks to retrieve", ge=1, le=20)
@@ -20,6 +20,12 @@ class QueryRequest(BaseModel):
None, description="LLM provider to use (defaults to system default)" None, description="LLM provider to use (defaults to system default)"
) )
model: str | None = Field(None, description="Model to use (provider-specific)") model: str | None = Field(None, description="Model to use (provider-specific)")
notebook_ids: list[str] | None = Field(
None, description="Optional list of NotebookLM notebook IDs to search"
)
include_documents: bool = Field(
True, description="Include regular documents in search (when notebook_ids specified)"
)
class QueryResponse(BaseModel): class QueryResponse(BaseModel):
@@ -31,16 +37,29 @@ class QueryResponse(BaseModel):
model: str model: str
sources: list[dict] sources: list[dict]
user: str user: str
filters_applied: dict | None = None
class NotebookQueryRequest(BaseModel):
"""Query request specifically for NotebookLM notebooks."""
question: str = Field(..., description="Question to ask")
notebook_ids: list[str] = Field(..., description="List of NotebookLM notebook IDs to search")
k: int = Field(5, description="Number of chunks to retrieve per notebook", ge=1, le=50)
provider: str | None = Field(
None, description="LLM provider to use (defaults to system default)"
)
model: str | None = Field(None, description="Model to use (provider-specific)")
@router.post( @router.post(
"/query", "/query",
summary="Query knowledge base", summary="Query knowledge base",
description="Query the RAG system with a question. Supports multiple LLM providers.", description="Query the RAG system with a question. Supports multiple LLM providers and NotebookLM notebooks.",
response_model=QueryResponse, response_model=QueryResponse,
) )
async def query(request: QueryRequest, current_user: dict = CurrentUser): async def query(request: QueryRequest, current_user: dict = CurrentUser):
"""Execute a RAG query with specified provider.""" """Execute a RAG query with specified provider and optional notebook filtering."""
try: try:
settings = get_settings() settings = get_settings()
@@ -59,9 +78,16 @@ async def query(request: QueryRequest, current_user: dict = CurrentUser):
f"Set API key in .env file.", f"Set API key in .env file.",
) )
# Execute query # Execute query with optional notebook filtering
service = await get_rag_service() service = await get_rag_service()
result = await service.query(request.question, k=request.k, provider=provider, model=model) result = await service.query(
request.question,
k=request.k,
provider=provider,
model=model,
notebook_ids=request.notebook_ids,
include_documents=request.include_documents,
)
return QueryResponse( return QueryResponse(
question=request.question, question=request.question,
@@ -70,6 +96,59 @@ async def query(request: QueryRequest, current_user: dict = CurrentUser):
model=result.get("model", model), model=result.get("model", model),
sources=result["sources"], sources=result["sources"],
user=current_user.get("user_id", "anonymous"), user=current_user.get("user_id", "anonymous"),
filters_applied=result.get("filters_applied"),
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/query/notebooks",
summary="Query NotebookLM notebooks",
description="Execute a RAG query specifically on indexed NotebookLM notebooks.",
response_model=QueryResponse,
)
async def query_notebooks(request: NotebookQueryRequest, current_user: dict = CurrentUser):
"""Execute a RAG query on specific NotebookLM notebooks."""
try:
settings = get_settings()
# Determine provider
provider = request.provider or settings.default_llm_provider
model = request.model or settings.default_llm_model
# Check if provider is configured
if not settings.is_provider_configured(provider):
available = settings.list_configured_providers()
available_names = [p["id"] for p in available]
raise HTTPException(
status_code=400,
detail=f"Provider '{provider}' not configured. "
f"Available: {available_names}. "
f"Set API key in .env file.",
)
# Execute query on notebooks only
service = await get_rag_service()
result = await service.query_notebooks(
question=request.question,
notebook_ids=request.notebook_ids,
k=request.k,
provider=provider,
model=model,
)
return QueryResponse(
question=request.question,
answer=result["answer"],
provider=provider,
model=result.get("model", model),
sources=result["sources"],
user=current_user.get("user_id", "anonymous"),
filters_applied=result.get("filters_applied"),
) )
except HTTPException: except HTTPException:
@@ -81,7 +160,7 @@ async def query(request: QueryRequest, current_user: dict = CurrentUser):
@router.post( @router.post(
"/chat", "/chat",
summary="Chat with documents", summary="Chat with documents",
description="Send a message and get a response based on documents.", description="Send a message and get a response based on documents and/or notebooks.",
response_model=QueryResponse, response_model=QueryResponse,
) )
async def chat(request: QueryRequest, current_user: dict = CurrentUser): async def chat(request: QueryRequest, current_user: dict = CurrentUser):

View File

@@ -86,6 +86,56 @@ class DocumentService:
"filename": Path(file_path).name, "filename": Path(file_path).name,
"chunks_count": len(result) if isinstance(result, list) else 1, "chunks_count": len(result) if isinstance(result, list) else 1,
"metadata": metadata or {}, "metadata": metadata or {},
"source": "document",
}
async def ingest_notebooklm_source(
self,
content: str,
notebook_id: str,
source_id: str,
source_title: str,
source_type: str,
notebook_title: str,
) -> dict:
"""Ingest a NotebookLM source into the vector store.
Args:
content: The text content from the source
notebook_id: The notebook ID
source_id: The source ID
source_title: The source title
source_type: The source type (url, file, etc.)
notebook_title: The notebook title
Returns:
Ingestion result with chunk count
"""
from datapizza.schema import Document
from uuid import uuid4
# Create document with metadata
doc = Document(
text=content,
metadata={
"notebook_id": notebook_id,
"source_id": source_id,
"source_title": source_title,
"source_type": source_type,
"notebook_title": notebook_title,
"source": "notebooklm",
},
)
# Process through pipeline
result = self.pipeline.run_document(doc)
return {
"source_id": source_id,
"source_title": source_title,
"chunks_count": len(result) if isinstance(result, list) else 1,
"notebook_id": notebook_id,
"source": "notebooklm",
} }
async def list_documents(self) -> list[dict]: async def list_documents(self) -> list[dict]:

View File

@@ -0,0 +1,257 @@
"""NotebookLM Indexer Service for integrating NotebookLM with RAG.
This service synchronizes content from Google NotebookLM notebooks
to the local Qdrant vector store, enabling RAG queries on notebook content.
"""
from typing import Any
from uuid import UUID, uuid4
from datapizza.embedders import ChunkEmbedder
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.splitters import NodeSplitter
from datapizza.vectorstores.qdrant import QdrantVectorstore
from agentic_rag.core.config import get_settings
from notebooklm_agent.services.notebook_service import NotebookService
from notebooklm_agent.services.source_service import SourceService
settings = get_settings()
class NotebookLMIndexerService:
"""Service for indexing NotebookLM notebooks into the vector store.
This service bridges NotebookLM Agent and DocuMente RAG by:
1. Extracting content from NotebookLM notebooks
2. Chunking and embedding the content
3. Storing in Qdrant with metadata for retrieval
"""
def __init__(self):
self.vector_store = None
self.embedder = None
self.splitter = None
self.notebook_service = None
self.source_service = None
self._init_components()
def _init_components(self):
"""Initialize vector store, embedder, and NotebookLM services."""
# Initialize vector store
self.vector_store = QdrantVectorstore(
host=settings.qdrant_host,
port=settings.qdrant_port,
)
# Initialize embedder
self.embedder = ChunkEmbedder(
client=OpenAIEmbedder(
api_key=settings.openai_api_key,
model_name=settings.embedding_model,
)
)
# Initialize text splitter for chunking
self.splitter = NodeSplitter(max_char=1024)
# Initialize NotebookLM services
self.notebook_service = NotebookService()
self.source_service = SourceService()
async def sync_notebook(self, notebook_id: str | UUID) -> dict:
"""Sync a notebook from NotebookLM to the vector store.
Args:
notebook_id: The notebook ID to sync
Returns:
Sync result with counts and status
"""
notebook_id = str(notebook_id)
sync_id = str(uuid4())
try:
# Get notebook info
notebook = await self.notebook_service.get(UUID(notebook_id))
# Get all sources from the notebook
sources = await self.source_service.list_sources(UUID(notebook_id))
total_chunks = 0
indexed_sources = []
for source in sources.items if hasattr(sources, "items") else sources:
# Try to get full text content from source
content = await self._extract_source_content(UUID(notebook_id), str(source.id))
if content:
# Chunk and index the content
chunks_count = await self._index_content(
content=content,
notebook_id=notebook_id,
source_id=str(source.id),
source_title=source.title,
source_type=source.type,
notebook_title=notebook.title,
sync_id=sync_id,
)
total_chunks += chunks_count
indexed_sources.append(
{"source_id": str(source.id), "title": source.title, "chunks": chunks_count}
)
return {
"sync_id": sync_id,
"notebook_id": notebook_id,
"notebook_title": notebook.title,
"status": "success",
"sources_indexed": len(indexed_sources),
"total_chunks": total_chunks,
"sources": indexed_sources,
}
except Exception as e:
return {
"sync_id": sync_id,
"notebook_id": notebook_id,
"status": "error",
"error": str(e),
}
async def _extract_source_content(self, notebook_id: UUID, source_id: str) -> str | None:
"""Extract text content from a source.
Args:
notebook_id: The notebook UUID
source_id: The source ID
Returns:
Extracted text content or None if not available
"""
try:
# Use the source service to get fulltext
fulltext = await self.source_service.get_fulltext(notebook_id, source_id)
return fulltext
except Exception:
return None
except Exception:
return None
async def _index_content(
self,
content: str,
notebook_id: str,
source_id: str,
source_title: str,
source_type: str,
notebook_title: str,
sync_id: str,
) -> int:
"""Index content chunks into the vector store.
Args:
content: The text content to index
notebook_id: The notebook ID
source_id: The source ID
source_title: The source title
source_type: The source type (url, file, etc.)
notebook_title: The notebook title
sync_id: The sync operation ID
Returns:
Number of chunks indexed
"""
# Split content into chunks
from datapizza.schema import Document
doc = Document(
text=content,
metadata={
"notebook_id": notebook_id,
"source_id": source_id,
"source_title": source_title,
"source_type": source_type,
"notebook_title": notebook_title,
"sync_id": sync_id,
"source": "notebooklm",
},
)
# Split into chunks
chunks = self.splitter.run(doc)
if not chunks:
return 0
# Embed and store each chunk
for chunk in chunks:
# Generate embedding
embedding = await self.embedder.aembed(chunk.text)
# Store in vector store with metadata
self.vector_store.add_points(
collection_name="documents",
points=[
{
"id": str(uuid4()),
"vector": embedding,
"payload": {"text": chunk.text, **chunk.metadata},
}
],
)
return len(chunks)
async def get_indexed_notebooks(self) -> list[dict]:
"""Get list of all indexed notebooks.
Returns:
List of indexed notebooks with metadata
"""
try:
# Get collection info
collection = self.vector_store.get_collection("documents")
# Search for unique notebook_ids in metadata
# This is a simplified version - in production you'd query the vector store
return []
except Exception:
return []
async def delete_notebook_index(self, notebook_id: str | UUID) -> bool:
"""Remove a notebook's index from the vector store.
Args:
notebook_id: The notebook ID to remove
Returns:
True if successful
"""
try:
notebook_id = str(notebook_id)
# Delete all points with matching notebook_id
self.vector_store.delete_points(
collection_name="documents",
filter_condition={
"must": [{"key": "notebook_id", "match": {"value": notebook_id}}]
},
)
return True
except Exception:
return False
# Singleton instance
_notebooklm_indexer = None
async def get_notebooklm_indexer() -> NotebookLMIndexerService:
"""Get or create the NotebookLM indexer service instance."""
global _notebooklm_indexer
if _notebooklm_indexer is None:
_notebooklm_indexer = NotebookLMIndexerService()
return _notebooklm_indexer

View File

@@ -30,7 +30,13 @@ class RAGService:
) )
async def query( async def query(
self, question: str, k: int = 5, provider: str | None = None, model: str | None = None self,
question: str,
k: int = 5,
provider: str | None = None,
model: str | None = None,
notebook_ids: list[str] | None = None,
include_documents: bool = True,
) -> dict: ) -> dict:
"""Execute a RAG query with specified provider. """Execute a RAG query with specified provider.
@@ -39,6 +45,8 @@ class RAGService:
k: Number of chunks to retrieve k: Number of chunks to retrieve
provider: LLM provider to use provider: LLM provider to use
model: Model name model: Model name
notebook_ids: Optional list of notebook IDs to filter by
include_documents: Whether to include local documents in search
Returns: Returns:
Response with answer and sources Response with answer and sources
@@ -49,8 +57,18 @@ class RAGService:
# Get query embedding # Get query embedding
query_embedding = await self._get_embedding(question) query_embedding = await self._get_embedding(question)
# Build filter condition if notebook_ids specified
filter_condition = None
if notebook_ids:
filter_condition = {
"should": [{"key": "notebook_id", "match": {"value": nid}} for nid in notebook_ids]
}
if include_documents:
# Also include regular documents (those without notebook_id)
filter_condition["should"].append({"key": "source", "match": {"value": "document"}})
# Retrieve relevant chunks # Retrieve relevant chunks
chunks = await vector_store.search(query_embedding, k=k) chunks = await vector_store.search(query_embedding, k=k, filter_condition=filter_condition)
# Format context from chunks # Format context from chunks
context = self._format_context(chunks) context = self._format_context(chunks)
@@ -62,14 +80,52 @@ class RAGService:
prompt = self._build_prompt(context, question) prompt = self._build_prompt(context, question)
response = await llm_client.invoke(prompt) response = await llm_client.invoke(prompt)
# Format sources with type information
formatted_sources = self._format_sources(chunks)
return { return {
"question": question, "question": question,
"answer": response.text, "answer": response.text,
"sources": chunks, "sources": formatted_sources,
"provider": provider or settings.default_llm_provider, "provider": provider or settings.default_llm_provider,
"model": model or getattr(response, "model", "unknown"), "model": model or getattr(response, "model", "unknown"),
"filters_applied": {
"notebook_ids": notebook_ids,
"include_documents": include_documents,
},
} }
async def query_notebooks(
self,
question: str,
notebook_ids: list[str],
k: int = 5,
provider: str | None = None,
model: str | None = None,
) -> dict:
"""Execute a RAG query specifically on NotebookLM notebooks.
This is a convenience method that queries only notebook content.
Args:
question: User question
notebook_ids: List of notebook IDs to search
k: Number of chunks to retrieve per notebook
provider: LLM provider to use
model: Model name
Returns:
Response with answer and sources from notebooks
"""
return await self.query(
question=question,
k=k,
provider=provider,
model=model,
notebook_ids=notebook_ids,
include_documents=False,
)
async def _get_embedding(self, text: str) -> list[float]: async def _get_embedding(self, text: str) -> list[float]:
"""Get embedding for text.""" """Get embedding for text."""
result = await self.embedder.aembed(text) result = await self.embedder.aembed(text)
@@ -84,6 +140,31 @@ class RAGService:
context_parts.append(f"[{i}] {text}") context_parts.append(f"[{i}] {text}")
return "\n\n".join(context_parts) return "\n\n".join(context_parts)
def _format_sources(self, chunks: list[dict]) -> list[dict]:
"""Format source information for response."""
formatted = []
for chunk in chunks:
source = {
"text": chunk.get("text", "")[:500] + "..."
if len(chunk.get("text", "")) > 500
else chunk.get("text", ""),
"source_type": chunk.get("source", "unknown"),
}
# Add notebook-specific metadata if available
if chunk.get("notebook_id"):
source["notebook_id"] = chunk.get("notebook_id")
source["notebook_title"] = chunk.get("notebook_title", "Unknown")
source["source_id"] = chunk.get("source_id")
source["source_title"] = chunk.get("source_title", "Unknown")
else:
# Regular document
source["document_id"] = chunk.get("document_id", "unknown")
formatted.append(source)
return formatted
def _build_prompt(self, context: str, question: str) -> str: def _build_prompt(self, context: str, question: str) -> str:
"""Build the RAG prompt.""" """Build the RAG prompt."""
return f"""You are a helpful AI assistant. Answer the question based on the provided context. return f"""You are a helpful AI assistant. Answer the question based on the provided context.
@@ -98,6 +179,7 @@ Instructions:
- If the context doesn't contain the answer, say "I don't have enough information to answer this question" - If the context doesn't contain the answer, say "I don't have enough information to answer this question"
- Be concise but complete - Be concise but complete
- Cite sources using [1], [2], etc. when referencing information - Cite sources using [1], [2], etc. when referencing information
- When citing notebook sources, mention the source title for clarity
Answer:""" Answer:"""

View File

@@ -1,5 +1,7 @@
"""Vector store service using datapizza-ai and Qdrant.""" """Vector store service using datapizza-ai and Qdrant."""
from typing import Any
from datapizza.vectorstores.qdrant import QdrantVectorstore from datapizza.vectorstores.qdrant import QdrantVectorstore
from agentic_rag.core.config import get_settings from agentic_rag.core.config import get_settings
@@ -25,11 +27,85 @@ class VectorStoreService:
except Exception: except Exception:
return False return False
async def search(self, query_vector: list[float], k: int = 5) -> list[dict]: async def search(
"""Search the vector store.""" self, query_vector: list[float], k: int = 5, filter_condition: dict | None = None
results = self.client.search(query_vector=query_vector, collection_name="documents", k=k) ) -> list[dict]:
"""Search the vector store.
Args:
query_vector: The query embedding vector
k: Number of results to return
filter_condition: Optional filter condition for metadata filtering
Returns:
List of search results with metadata
"""
if filter_condition:
results = self.client.search(
query_vector=query_vector,
collection_name="documents",
k=k,
filter_condition=filter_condition,
)
else:
results = self.client.search(
query_vector=query_vector, collection_name="documents", k=k
)
return results return results
async def add_points(self, collection_name: str, points: list[dict]) -> bool:
"""Add points to the vector store.
Args:
collection_name: Name of the collection
points: List of points with id, vector, and payload
Returns:
True if successful
"""
try:
self.client.add_points(collection_name, points)
return True
except Exception:
return False
async def delete_points(self, collection_name: str, filter_condition: dict) -> bool:
"""Delete points from the vector store matching a filter.
Args:
collection_name: Name of the collection
filter_condition: Filter condition for points to delete
Returns:
True if successful
"""
try:
self.client.delete_points(collection_name, filter_condition)
return True
except Exception:
return False
async def scroll_points(
self, collection_name: str, filter_condition: dict | None = None, limit: int = 100
) -> list[dict]:
"""Scroll through points in the vector store.
Args:
collection_name: Name of the collection
filter_condition: Optional filter condition
limit: Maximum number of points to return
Returns:
List of points
"""
try:
results = self.client.scroll(
collection_name=collection_name, filter_condition=filter_condition, limit=limit
)
return results
except Exception:
return []
# Singleton # Singleton
_vector_store = None _vector_store = None