Source code for praval.memory.long_term_memory

"""
Long-term vector memory using Qdrant for Praval agents

This provides persistent, vector-based storage for:
- Semantic knowledge and concepts
- Long-term conversation history
- Learned patterns and insights
- Cross-session memory persistence
"""

from typing import List, Optional, Dict, Any
import logging
import numpy as np
from datetime import datetime

try:
    from qdrant_client import QdrantClient
    from qdrant_client.http import models
    from qdrant_client.http.models import Distance, VectorParams, PointStruct
    QDRANT_AVAILABLE = True
except ImportError:
    QDRANT_AVAILABLE = False
    QdrantClient = None
    models = None

try:
    import openai
    OPENAI_AVAILABLE = True
except ImportError:
    OPENAI_AVAILABLE = False
    openai = None

from .memory_types import MemoryEntry, MemoryType, MemoryQuery, MemorySearchResult


logger = logging.getLogger(__name__)


[docs] class LongTermMemory: """ Qdrant-based long-term memory for persistent vector storage Features: - Vector similarity search - Persistent storage across sessions - Scalable to millions of memories - Semantic search capabilities - Memory importance scoring """
[docs] def __init__(self, qdrant_url: str = "http://localhost:6333", collection_name: str = "praval_memories", vector_size: int = 1536, # OpenAI embedding size distance_metric: str = "cosine"): """ Initialize long-term memory Args: qdrant_url: URL to Qdrant instance collection_name: Name of the collection to use vector_size: Size of embedding vectors distance_metric: Distance metric for similarity search """ if not QDRANT_AVAILABLE: raise ImportError("qdrant-client is required for long-term memory. Install with: pip install qdrant-client") self.qdrant_url = qdrant_url self.collection_name = collection_name self.vector_size = vector_size self.distance_metric = distance_metric # Initialize Qdrant client self.client = QdrantClient(url=qdrant_url) # Initialize collection self._ensure_collection_exists() logger.info(f"Long-term memory initialized with Qdrant at {qdrant_url}")
def _ensure_collection_exists(self): """Ensure the Qdrant collection exists""" try: # Check if collection exists collections = self.client.get_collections().collections collection_names = [c.name for c in collections] if self.collection_name not in collection_names: # Create collection distance_map = { "cosine": Distance.COSINE, "euclidean": Distance.EUCLID, "dot": Distance.DOT } self.client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=self.vector_size, distance=distance_map.get(self.distance_metric, Distance.COSINE) ) ) logger.info(f"Created Qdrant collection: {self.collection_name}") else: logger.info(f"Using existing Qdrant collection: {self.collection_name}") except Exception as e: logger.error(f"Failed to initialize Qdrant collection: {e}") raise
[docs] def store(self, memory: MemoryEntry) -> str: """ Store a memory entry with vector embedding Args: memory: The memory entry to store Returns: The ID of the stored memory """ try: # Generate embedding if not provided if memory.embedding is None: memory.embedding = self._generate_embedding(memory.content) # Create point for Qdrant point = PointStruct( id=memory.id, vector=memory.embedding, payload={ 'agent_id': memory.agent_id, 'memory_type': memory.memory_type.value, 'content': memory.content, 'metadata': memory.metadata, 'created_at': memory.created_at.isoformat(), 'accessed_at': memory.accessed_at.isoformat(), 'access_count': memory.access_count, 'importance': memory.importance } ) # Store in Qdrant self.client.upsert( collection_name=self.collection_name, points=[point] ) logger.debug(f"Stored memory {memory.id} in long-term memory") return memory.id except Exception as e: logger.error(f"Failed to store memory {memory.id}: {e}") raise
[docs] def retrieve(self, memory_id: str) -> Optional[MemoryEntry]: """ Retrieve a specific memory by ID Args: memory_id: The ID of the memory to retrieve Returns: The memory entry if found, None otherwise """ try: # Retrieve from Qdrant result = self.client.retrieve( collection_name=self.collection_name, ids=[memory_id], with_payload=True, with_vectors=True ) if not result: return None point = result[0] memory = self._point_to_memory_entry(point) # Update access information memory.mark_accessed() self._update_access_info(memory) return memory except Exception as e: logger.error(f"Failed to retrieve memory {memory_id}: {e}") return None
[docs] def search(self, query: MemoryQuery) -> MemorySearchResult: """ Search memories using vector similarity Args: query: The search query Returns: Search results with matching memories """ try: # Generate query embedding query_embedding = self._generate_embedding(query.query_text) # Build filter conditions filters = [] if query.agent_id: filters.append(models.FieldCondition( key="agent_id", match=models.MatchValue(value=query.agent_id) )) if query.memory_types: memory_type_values = [mt.value for mt in query.memory_types] filters.append(models.FieldCondition( key="memory_type", match=models.MatchAny(any=memory_type_values) )) # Temporal filtering if query.temporal_filter: if 'after' in query.temporal_filter: filters.append(models.FieldCondition( key="created_at", range=models.Range(gte=query.temporal_filter['after'].isoformat()) )) if 'before' in query.temporal_filter: filters.append(models.FieldCondition( key="created_at", range=models.Range(lte=query.temporal_filter['before'].isoformat()) )) # Combine filters filter_condition = None if filters: filter_condition = models.Filter(must=filters) # Search in Qdrant search_result = self.client.search( collection_name=self.collection_name, query_vector=query_embedding, query_filter=filter_condition, limit=query.limit, score_threshold=query.similarity_threshold, with_payload=True, with_vectors=True ) # Convert results entries = [] scores = [] for scored_point in search_result: memory = self._point_to_memory_entry(scored_point) memory.mark_accessed() entries.append(memory) scores.append(scored_point.score) # Update access information for retrieved memories for memory in entries: self._update_access_info(memory) return MemorySearchResult( entries=entries, scores=scores, query=query, total_found=len(entries) ) except Exception as e: logger.error(f"Failed to search memories: {e}") return MemorySearchResult(entries=[], scores=[], query=query, total_found=0)
[docs] def delete(self, memory_id: str) -> bool: """ Delete a memory entry Args: memory_id: The ID of the memory to delete Returns: True if deleted successfully, False otherwise """ try: self.client.delete( collection_name=self.collection_name, points_selector=models.PointIdsList(points=[memory_id]) ) logger.debug(f"Deleted memory {memory_id} from long-term memory") return True except Exception as e: logger.error(f"Failed to delete memory {memory_id}: {e}") return False
[docs] def clear_agent_memories(self, agent_id: str): """Clear all memories for a specific agent""" try: self.client.delete( collection_name=self.collection_name, points_selector=models.FilterSelector( filter=models.Filter( must=[models.FieldCondition( key="agent_id", match=models.MatchValue(value=agent_id) )] ) ) ) logger.info(f"Cleared all memories for agent {agent_id}") except Exception as e: logger.error(f"Failed to clear memories for agent {agent_id}: {e}") raise
[docs] def get_stats(self) -> Dict[str, Any]: """Get memory statistics""" try: collection_info = self.client.get_collection(self.collection_name) return { 'total_memories': collection_info.points_count, 'vector_size': collection_info.config.params.vectors.size, 'distance_metric': collection_info.config.params.vectors.distance.name, 'collection_name': self.collection_name, 'qdrant_url': self.qdrant_url } except Exception as e: logger.error(f"Failed to get memory statistics: {e}") return {}
def _generate_embedding(self, text: str) -> List[float]: """Generate embedding for text using OpenAI""" if not OPENAI_AVAILABLE: # Fallback to random embedding (for testing) logger.warning("OpenAI not available, using random embedding") return np.random.random(self.vector_size).tolist() try: response = openai.embeddings.create( model="text-embedding-ada-002", input=text ) return response.data[0].embedding except Exception as e: logger.error(f"Failed to generate embedding: {e}") # Fallback to random embedding return np.random.random(self.vector_size).tolist() def _point_to_memory_entry(self, point) -> MemoryEntry: """Convert Qdrant point to MemoryEntry""" payload = point.payload return MemoryEntry( id=point.id, agent_id=payload['agent_id'], memory_type=MemoryType(payload['memory_type']), content=payload['content'], metadata=payload['metadata'], embedding=point.vector if hasattr(point, 'vector') else None, created_at=datetime.fromisoformat(payload['created_at']), accessed_at=datetime.fromisoformat(payload['accessed_at']), access_count=payload.get('access_count', 0), importance=payload.get('importance', 0.5) ) def _update_access_info(self, memory: MemoryEntry): """Update access information in Qdrant""" try: self.client.set_payload( collection_name=self.collection_name, payload={ 'accessed_at': memory.accessed_at.isoformat(), 'access_count': memory.access_count }, points=[memory.id] ) except Exception as e: logger.warning(f"Failed to update access info for memory {memory.id}: {e}")
[docs] def health_check(self) -> bool: """Check if Qdrant connection is healthy""" try: self.client.get_collections() return True except Exception as e: logger.error(f"Qdrant health check failed: {e}") return False