|
|
import os |
|
|
import traceback |
|
|
from pinecone import Pinecone, ServerlessSpec |
|
|
from langchain_pinecone import PineconeVectorStore |
|
|
from src.config.settings import Config |
|
|
from src.utils.embedding import get_embedding_model |
|
|
|
|
|
class PineconeManager: |
|
|
def __init__(self, index_name=None): |
|
|
""" |
|
|
Initialize Pinecone client and connect to the index. |
|
|
Creates the index if it doesn't exist. |
|
|
""" |
|
|
self.api_key = Config.PINECONE_API_KEY |
|
|
if not self.api_key: |
|
|
raise ValueError("PINECONE_API_KEY not found in environment variables") |
|
|
|
|
|
self.pc = Pinecone(api_key=self.api_key) |
|
|
self.index_name = index_name or Config.PINECONE_INDEX |
|
|
|
|
|
|
|
|
try: |
|
|
existing_indexes = [i.name for i in self.pc.list_indexes()] |
|
|
if self.index_name not in existing_indexes: |
|
|
print(f"Index '{self.index_name}' does not exist. Creating it now...") |
|
|
|
|
|
|
|
|
|
|
|
self.pc.create_index( |
|
|
name=self.index_name, |
|
|
dimension=1536, |
|
|
metric='cosine', |
|
|
spec=ServerlessSpec( |
|
|
cloud='aws', |
|
|
region='us-east-1' |
|
|
) |
|
|
) |
|
|
print(f"β
Successfully created index '{self.index_name}'") |
|
|
else: |
|
|
print(f"β
Connected to existing index '{self.index_name}'") |
|
|
except Exception as e: |
|
|
print(f"Error managing Pinecone index: {e}") |
|
|
raise e |
|
|
|
|
|
self.index = self.pc.Index(self.index_name) |
|
|
self.embeddings = get_embedding_model() |
|
|
|
|
|
|
|
|
self.run_diagnostics() |
|
|
|
|
|
def run_diagnostics(self): |
|
|
"""Run connectivity and functionality tests.""" |
|
|
print("\nπ Running Pinecone & Embedding Diagnostics...") |
|
|
|
|
|
|
|
|
try: |
|
|
print(f"Testing connection to index '{self.index_name}'...") |
|
|
stats = self.index.describe_index_stats() |
|
|
print(f"β
Data Plane Verified. Total vectors: {stats.total_vector_count}") |
|
|
except Exception as e: |
|
|
print(f"β Data Plane Failed: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
try: |
|
|
print("π§ͺ Testing Embedding Model (OpenAI)...") |
|
|
|
|
|
emb = self.embeddings.embed_query("test connectivity") |
|
|
print(f"β
Embedding Model Verified. Dimension: {len(emb)}") |
|
|
except Exception as e: |
|
|
print(f"β Embedding Model Failed: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
try: |
|
|
print("π Testing LangChain Integration...") |
|
|
|
|
|
PineconeVectorStore( |
|
|
index_name=self.index_name, |
|
|
embedding=self.embeddings, |
|
|
pinecone_api_key=self.api_key |
|
|
) |
|
|
print("β
LangChain Integration Verified (Initialization)") |
|
|
except Exception as e: |
|
|
print(f"β LangChain Integration Failed: {e}") |
|
|
traceback.print_exc() |
|
|
|
|
|
def upsert_documents(self, documents, namespace=None): |
|
|
""" |
|
|
Upsert LangChain Document objects into the Pinecone index. |
|
|
""" |
|
|
if namespace is None: |
|
|
namespace = Config.PINECONE_NAMESPACE |
|
|
if not documents: |
|
|
print("No documents to upsert.") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
PineconeVectorStore.from_documents( |
|
|
documents=documents, |
|
|
embedding=self.embeddings, |
|
|
index_name=self.index_name, |
|
|
namespace=namespace, |
|
|
pinecone_api_key=self.api_key |
|
|
) |
|
|
print(f"Successfully upserted {len(documents)} documents to namespace '{namespace}'.") |
|
|
except Exception as e: |
|
|
print(f"Error upserting documents: {e}") |
|
|
raise e |
|
|
|
|
|
def get_retriever(self, namespace=None, search_kwargs=None): |
|
|
""" |
|
|
Returns a LangChain retriever for the specified namespace. |
|
|
""" |
|
|
if namespace is None: |
|
|
namespace = Config.PINECONE_NAMESPACE |
|
|
if search_kwargs is None: |
|
|
search_kwargs = {"k": 5} |
|
|
|
|
|
vectorstore = PineconeVectorStore( |
|
|
index_name=self.index_name, |
|
|
embedding=self.embeddings, |
|
|
namespace=namespace, |
|
|
pinecone_api_key=self.api_key |
|
|
) |
|
|
|
|
|
return vectorstore.as_retriever(search_kwargs=search_kwargs) |
|
|
|
|
|
def delete_by_meeting_id(self, meeting_id: str, namespace: str = None): |
|
|
""" |
|
|
Delete all vectors associated with a specific meeting_id. |
|
|
|
|
|
Args: |
|
|
meeting_id: The meeting ID to delete (e.g., "meeting_abc12345") |
|
|
namespace: The namespace to delete from (default: Config.PINECONE_NAMESPACE) |
|
|
""" |
|
|
if namespace is None: |
|
|
namespace = Config.PINECONE_NAMESPACE |
|
|
try: |
|
|
|
|
|
|
|
|
delete_response = self.index.delete( |
|
|
filter={"meeting_id": {"$eq": meeting_id}}, |
|
|
namespace=namespace |
|
|
) |
|
|
|
|
|
print(f"β
Successfully deleted vectors for meeting_id: {meeting_id}") |
|
|
|
|
|
|
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error deleting vectors for meeting_id {meeting_id}: {e}") |
|
|
raise e |
|
|
|
|
|
def delete_namespace(self, namespace: str): |
|
|
""" |
|
|
Delete ALL vectors in a specific namespace. |
|
|
WARNING: This deletes everything in the namespace! |
|
|
|
|
|
Args: |
|
|
namespace: The namespace to clear |
|
|
""" |
|
|
try: |
|
|
self.index.delete(delete_all=True, namespace=namespace) |
|
|
print(f"β
Successfully deleted all vectors in namespace: {namespace}") |
|
|
except Exception as e: |
|
|
print(f"Error deleting namespace {namespace}: {e}") |
|
|
raise e |
|
|
|
|
|
def list_meetings(self, namespace: str = None, limit: int = 100): |
|
|
""" |
|
|
List all unique meeting IDs stored in Pinecone. |
|
|
|
|
|
Args: |
|
|
namespace: The namespace to query (default: Config.PINECONE_NAMESPACE) |
|
|
limit: Maximum number of vectors to scan (default: 100) |
|
|
|
|
|
Returns: |
|
|
List of dictionaries with meeting metadata |
|
|
""" |
|
|
if namespace is None: |
|
|
namespace = Config.PINECONE_NAMESPACE |
|
|
try: |
|
|
|
|
|
results = self.index.query( |
|
|
namespace=namespace, |
|
|
vector=[0.0] * 1536, |
|
|
top_k=limit, |
|
|
include_metadata=True |
|
|
) |
|
|
|
|
|
|
|
|
meetings = {} |
|
|
for match in results.matches: |
|
|
metadata = match.metadata |
|
|
meeting_id = metadata.get("meeting_id") |
|
|
|
|
|
if meeting_id and meeting_id not in meetings: |
|
|
meetings[meeting_id] = { |
|
|
"meeting_id": meeting_id, |
|
|
"meeting_date": metadata.get("meeting_date"), |
|
|
"meeting_title": metadata.get("meeting_title", metadata.get("title", "Untitled Meeting")), |
|
|
"meeting_duration": metadata.get("duration", metadata.get("meeting_duration", "N/A")), |
|
|
"source_file": metadata.get("source_file", "N/A"), |
|
|
} |
|
|
|
|
|
return list(meetings.values()) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error listing meetings: {e}") |
|
|
return [] |