"""
MemoryManager - Unified interface for all Praval agent memory systems
This coordinates:
- Short-term working memory
- Long-term vector memory
- Episodic conversation memory
- Semantic knowledge memory
"""
from typing import Dict, List, Optional, Any, Union
import logging
import os
from datetime import datetime
from .memory_types import MemoryEntry, MemoryType, MemoryQuery, MemorySearchResult
from .short_term_memory import ShortTermMemory
from .long_term_memory import LongTermMemory
from .episodic_memory import EpisodicMemory
from .semantic_memory import SemanticMemory
from .embedded_store import EmbeddedVectorStore
logger = logging.getLogger(__name__)
[docs]
class MemoryManager:
"""
Unified memory management system for Praval agents
Provides a single interface to:
- Store and retrieve memories across all systems
- Coordinate between short-term and long-term storage
- Manage different types of memory (episodic, semantic, etc.)
- Optimize memory access patterns
"""
[docs]
def __init__(self,
agent_id: str,
backend: str = "auto",
qdrant_url: str = "http://localhost:6333",
storage_path: Optional[str] = None,
collection_name: str = "praval_memories",
short_term_max_entries: int = 1000,
short_term_retention_hours: int = 24,
knowledge_base_path: Optional[str] = None):
"""
Initialize the unified memory manager
Args:
agent_id: ID of the agent using this memory
backend: Memory backend ("auto", "chromadb", "qdrant", "memory")
qdrant_url: URL for Qdrant vector database
storage_path: Path for persistent storage
collection_name: Collection name for vector storage
short_term_max_entries: Max entries in short-term memory
short_term_retention_hours: Short-term memory retention time
knowledge_base_path: Path to knowledge base files to auto-index
"""
self.agent_id = agent_id
self.backend = backend
self.qdrant_url = qdrant_url
self.storage_path = storage_path
self.collection_name = collection_name
self.knowledge_base_path = knowledge_base_path
# Auto-detect knowledge base from environment if not provided
if not self.knowledge_base_path:
self.knowledge_base_path = os.getenv('PRAVAL_KNOWLEDGE_BASE')
# Initialize memory subsystems based on backend preference
self.long_term_memory = None
self.embedded_store = None
if backend in ["auto", "chromadb", "embedded"]:
try:
self.embedded_store = EmbeddedVectorStore(
storage_path=storage_path,
collection_name=collection_name,
enable_collection_separation=True # Enable separated collections by default
)
self.backend = "chromadb"
logger.info("Embedded ChromaDB memory initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize embedded memory: {e}")
if backend != "auto":
raise
# Fallback to Qdrant if embedded fails and auto mode
if backend in ["auto", "qdrant"] and self.embedded_store is None:
try:
self.long_term_memory = LongTermMemory(
qdrant_url=qdrant_url,
collection_name=collection_name
)
self.backend = "qdrant"
logger.info("Qdrant long-term memory initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Qdrant memory: {e}")
if backend != "auto":
raise
self.short_term_memory = ShortTermMemory(
max_entries=short_term_max_entries,
retention_hours=short_term_retention_hours
)
# Initialize specialized memory managers
vector_store = self.embedded_store or self.long_term_memory
if vector_store:
self.episodic_memory = EpisodicMemory(
long_term_memory=vector_store,
short_term_memory=self.short_term_memory
)
self.semantic_memory = SemanticMemory(
long_term_memory=vector_store
)
else:
self.episodic_memory = None
self.semantic_memory = None
self.backend = "memory"
logger.warning("Using memory-only backend - no persistent storage")
# Auto-index knowledge base if provided
if self.knowledge_base_path and vector_store:
self._index_knowledge_base()
[docs]
def store_memory(self,
agent_id: str,
content: str,
memory_type: MemoryType = MemoryType.SHORT_TERM,
metadata: Optional[Dict[str, Any]] = None,
importance: float = 0.5,
store_long_term: bool = None) -> str:
"""
Store a memory entry
Args:
agent_id: The agent storing the memory
content: The memory content
memory_type: Type of memory
metadata: Additional metadata
importance: Importance score (0.0 to 1.0)
store_long_term: Whether to store in long-term memory (auto-decided if None)
Returns:
Memory ID
"""
memory = MemoryEntry(
id=None,
agent_id=agent_id,
memory_type=memory_type,
content=content,
metadata=metadata or {},
importance=importance
)
# Always store in short-term memory
memory_id = self.short_term_memory.store(memory)
# Decide whether to store in long-term memory
if store_long_term is None:
store_long_term = self._should_store_long_term(memory)
# Store in persistent storage (embedded or qdrant)
vector_store = self.embedded_store or self.long_term_memory
if store_long_term and vector_store:
try:
vector_store.store(memory)
logger.debug(f"Memory {memory_id} stored in both short-term and persistent memory")
except Exception as e:
logger.error(f"Failed to store memory {memory_id} in persistent storage: {e}")
return memory_id
[docs]
def retrieve_memory(self, memory_id: str) -> Optional[MemoryEntry]:
"""
Retrieve a specific memory by ID
Args:
memory_id: The memory ID
Returns:
The memory entry if found
"""
# Try short-term memory first (faster)
memory = self.short_term_memory.retrieve(memory_id)
# Fallback to persistent storage
vector_store = self.embedded_store or self.long_term_memory
if memory is None and vector_store:
memory = vector_store.retrieve(memory_id)
# Cache in short-term memory for future access
if memory:
self.short_term_memory.store(memory)
return memory
[docs]
def search_memories(self, query: MemoryQuery) -> MemorySearchResult:
"""
Search memories across all systems
Args:
query: The search query
Returns:
Combined search results
"""
results = []
# Search short-term memory
st_results = self.short_term_memory.search(query)
results.append(("short_term", st_results))
# Search persistent memory if available
vector_store = self.embedded_store or self.long_term_memory
if vector_store:
try:
persistent_results = vector_store.search(query)
results.append(("persistent", persistent_results))
except Exception as e:
logger.error(f"Persistent memory search failed: {e}")
persistent_results = MemorySearchResult(entries=[], scores=[], query=query, total_found=0)
results.append(("persistent", persistent_results))
# Combine and deduplicate results
return self._combine_search_results(results, query)
[docs]
def get_conversation_context(self,
agent_id: str,
turns: int = 10) -> List[MemoryEntry]:
"""
Get recent conversation context for an agent
Args:
agent_id: The agent ID
turns: Number of conversation turns
Returns:
List of conversation memories
"""
if self.episodic_memory:
return self.episodic_memory.get_conversation_context(agent_id, turns)
else:
# Fallback to general recent memories
return self.short_term_memory.get_recent(agent_id=agent_id, limit=turns)
[docs]
def store_conversation_turn(self,
agent_id: str,
user_message: str,
agent_response: str,
context: Optional[Dict[str, Any]] = None) -> str:
"""
Store a conversation turn
Args:
agent_id: The agent ID
user_message: User's message
agent_response: Agent's response
context: Additional context
Returns:
Memory ID
"""
if self.episodic_memory:
return self.episodic_memory.store_conversation_turn(
agent_id, user_message, agent_response, context
)
else:
# Fallback to basic memory storage
content = f"User: {user_message}\nAgent: {agent_response}"
return self.store_memory(
agent_id=agent_id,
content=content,
memory_type=MemoryType.EPISODIC,
metadata={"type": "conversation", "context": context},
importance=0.7
)
[docs]
def store_knowledge(self,
agent_id: str,
knowledge: str,
domain: str = "general",
confidence: float = 1.0,
knowledge_type: str = "fact") -> str:
"""
Store knowledge or facts
Args:
agent_id: The agent ID
knowledge: The knowledge content
domain: Domain of knowledge
confidence: Confidence in the knowledge
knowledge_type: Type of knowledge (fact, concept, rule)
Returns:
Memory ID
"""
if self.semantic_memory:
if knowledge_type == "fact":
return self.semantic_memory.store_fact(
agent_id, knowledge, domain, confidence
)
else:
return self.semantic_memory.store_concept(
agent_id, knowledge, knowledge, domain
)
else:
# Fallback to basic memory storage
return self.store_memory(
agent_id=agent_id,
content=knowledge,
memory_type=MemoryType.SEMANTIC,
metadata={
"domain": domain,
"confidence": confidence,
"knowledge_type": knowledge_type
},
importance=0.8
)
[docs]
def get_domain_knowledge(self,
agent_id: str,
domain: str,
limit: int = 20) -> List[MemoryEntry]:
"""
Get knowledge in a specific domain
Args:
agent_id: The agent ID
domain: The domain
limit: Maximum results
Returns:
List of knowledge entries
"""
if self.semantic_memory:
return self.semantic_memory.get_knowledge_in_domain(agent_id, domain, limit)
else:
# Fallback search
query = MemoryQuery(
query_text=domain,
memory_types=[MemoryType.SEMANTIC],
agent_id=agent_id,
limit=limit
)
results = self.search_memories(query)
return results.entries
[docs]
def clear_agent_memories(self, agent_id: str, memory_types: Optional[List[MemoryType]] = None):
"""
Clear memories for a specific agent
Args:
agent_id: The agent ID
memory_types: Types of memory to clear (all if None)
"""
# Clear short-term memory
self.short_term_memory.clear_agent_memories(agent_id)
# Clear persistent memory
vector_store = self.embedded_store or self.long_term_memory
if vector_store:
try:
vector_store.clear_agent_memories(agent_id)
except Exception as e:
logger.error(f"Failed to clear persistent memories for {agent_id}: {e}")
logger.info(f"Cleared memories for agent {agent_id}")
[docs]
def get_memory_stats(self) -> Dict[str, Any]:
"""Get comprehensive memory statistics"""
stats = {
"agent_id": self.agent_id,
"backend": self.backend,
"short_term_memory": self.short_term_memory.get_stats(),
"collection_name": self.collection_name
}
# Add backend-specific stats
vector_store = self.embedded_store or self.long_term_memory
if vector_store:
try:
persistent_stats = vector_store.get_stats()
stats["persistent_memory"] = {**persistent_stats, "available": True}
except Exception as e:
stats["persistent_memory"] = {"available": False, "error": str(e)}
else:
stats["persistent_memory"] = {"available": False, "error": "Not initialized"}
# Add knowledge base info
if self.knowledge_base_path:
stats["knowledge_base"] = {
"path": self.knowledge_base_path,
"indexed": True
}
if self.episodic_memory:
stats["episodic_memory"] = self.episodic_memory.get_stats()
if self.semantic_memory:
stats["semantic_memory"] = self.semantic_memory.get_stats()
return stats
[docs]
def health_check(self) -> Dict[str, bool]:
"""Check health of all memory systems"""
health = {
"short_term_memory": True, # Always available
"persistent_memory": False,
"episodic_memory": False,
"semantic_memory": False
}
vector_store = self.embedded_store or self.long_term_memory
if vector_store:
health["persistent_memory"] = vector_store.health_check()
health["episodic_memory"] = health["persistent_memory"] # Depends on persistent
health["semantic_memory"] = health["persistent_memory"] # Depends on persistent
return health
def _should_store_long_term(self, memory: MemoryEntry) -> bool:
"""Decide whether a memory should be stored long-term"""
# Store important memories
if memory.importance >= 0.7:
return True
# Store semantic and episodic memories
if memory.memory_type in [MemoryType.SEMANTIC, MemoryType.EPISODIC]:
return True
# Store long content
if len(memory.content) > 200:
return True
return False
def _combine_search_results(self,
results: List[tuple],
query: MemoryQuery) -> MemorySearchResult:
"""Combine search results from multiple memory systems"""
all_entries = []
all_scores = []
seen_ids = set()
# Combine results, preferring short-term (more recent/relevant)
for source, result in results:
for entry, score in zip(result.entries, result.scores):
if entry.id not in seen_ids:
all_entries.append(entry)
all_scores.append(score)
seen_ids.add(entry.id)
# Sort by score (descending)
combined = list(zip(all_entries, all_scores))
combined.sort(key=lambda x: x[1], reverse=True)
# Apply limit
combined = combined[:query.limit]
if combined:
final_entries, final_scores = zip(*combined)
else:
final_entries, final_scores = [], []
return MemorySearchResult(
entries=list(final_entries),
scores=list(final_scores),
query=query,
total_found=len(all_entries)
)
def _index_knowledge_base(self):
"""Index knowledge base files if path is provided"""
from pathlib import Path
if not self.knowledge_base_path:
return
kb_path = Path(self.knowledge_base_path)
if not kb_path.exists():
logger.warning(f"Knowledge base path does not exist: {kb_path}")
return
if not kb_path.is_dir():
logger.warning(f"Knowledge base path is not a directory: {kb_path}")
return
try:
vector_store = self.embedded_store or self.long_term_memory
if hasattr(vector_store, 'index_knowledge_files'):
indexed_count = vector_store.index_knowledge_files(kb_path, self.agent_id)
logger.info(f"Indexed {indexed_count} knowledge base files for agent {self.agent_id}")
else:
logger.warning("Vector store does not support knowledge file indexing")
except Exception as e:
logger.error(f"Failed to index knowledge base: {e}")
[docs]
def recall_by_id(self, memory_id: str) -> List[MemoryEntry]:
"""Recall a specific memory by ID (for spore references)"""
memory = self.retrieve_memory(memory_id)
return [memory] if memory else []
[docs]
def get_knowledge_references(self, content: str, importance_threshold: float = 0.7) -> List[str]:
"""Get knowledge references for lightweight spores"""
# Store the content and return reference ID
memory_id = self.store_memory(
agent_id=self.agent_id,
content=content,
memory_type=MemoryType.SEMANTIC,
importance=importance_threshold,
store_long_term=True
)
return [memory_id]
[docs]
def shutdown(self):
"""Shutdown all memory systems"""
self.short_term_memory.shutdown()
# No explicit shutdown needed for ChromaDB (files are auto-saved)
# Qdrant connections will be closed automatically
logger.info("Memory manager shutdown complete")