"""
Core Agent class for the Praval framework.
The Agent class provides a simple, composable interface for LLM-based
conversations with support for multiple providers, tools, and state persistence.
"""
import logging
import asyncio
import os
import inspect
import time
import threading
import uuid
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# Auto-load .env files if available
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
# python-dotenv not available, continue without it
pass
from .exceptions import (
PravalError,
ProviderError,
ConfigurationError,
ToolError,
InterventionRequired,
HITLConfigurationError,
)
from .tool_registry import Tool, ToolMetadata, get_tool_registry
from .storage import StateStorage
from ..providers.factory import ProviderFactory
[docs]
@dataclass
class AgentConfig:
"""Configuration for Agent behavior and LLM parameters."""
provider: Optional[str] = None
temperature: float = 0.7
max_tokens: int = 1000
system_message: Optional[str] = None
[docs]
def __post_init__(self):
"""Validate configuration parameters."""
if not (0 <= self.temperature <= 2):
raise ValueError("temperature must be between 0 and 2")
if self.max_tokens <= 0:
raise ValueError("max_tokens must be positive")
[docs]
class Agent:
"""
A simple, composable LLM agent.
The Agent class provides the core functionality for LLM-based conversations
with support for multiple providers, conversation history, tools, and
state persistence.
Examples:
Basic usage:
>>> agent = Agent("assistant")
>>> response = agent.chat("Hello!")
With persistence:
>>> agent = Agent("my_agent", persist_state=True)
>>> agent.chat("Remember this conversation")
With tools:
>>> agent = Agent("calculator")
>>> @agent.tool
>>> def add(x: int, y: int) -> int:
... return x + y
"""
[docs]
def __init__(
self,
name: str,
provider: Optional[str] = None,
persist_state: bool = False,
system_message: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
memory_enabled: bool = False,
memory_config: Optional[Dict[str, Any]] = None,
knowledge_base: Optional[str] = None,
max_history: Optional[int] = 100,
hitl_enabled: bool = False,
hitl_db_path: Optional[str] = None,
):
"""
Initialize a new Agent.
Args:
name: Unique identifier for this agent
provider: LLM provider to use (openai, anthropic, cohere)
persist_state: Whether to persist conversation state
system_message: System message to set agent behavior
config: Additional configuration parameters
memory_enabled: Whether to enable vector memory capabilities
memory_config: Configuration for memory system
knowledge_base: Path to knowledge base files to auto-index
max_history: Max conversation turns to retain (None for unbounded)
Raises:
ValueError: If name is empty or configuration is invalid
ProviderError: If provider setup fails
"""
if not name:
raise ValueError("Agent name cannot be empty")
self.name = name
self.persist_state = persist_state
self.memory_enabled = memory_enabled
self.knowledge_base = knowledge_base
self.tools: Dict[str, Dict[str, Any]] = {}
self.conversation_history: List[Dict[str, str]] = []
self.max_history = max_history
self._hitl_enabled = hitl_enabled
self._hitl_db_path = hitl_db_path
self._hitl_service = None
# Lifecycle management
self._closed = False
self._subscribed_channels: List[str] = []
# Setup configuration
config_dict = config or {}
if system_message:
config_dict["system_message"] = system_message
if provider:
config_dict["provider"] = provider
self.config = AgentConfig(**config_dict)
# Setup provider
self.provider_name = self._detect_provider()
self.provider = ProviderFactory.create_provider(self.provider_name, self.config)
# Setup memory system
if self.memory_enabled:
self._init_memory_system(memory_config)
else:
self.memory = None
# Setup state storage
if self.persist_state:
self._storage = StateStorage()
self._load_state()
else:
self._storage = None
# Add system message to conversation if provided
if self.config.system_message:
self.conversation_history.append(
{"role": "system", "content": self.config.system_message}
)
self._trim_history()
# ==========================================
def _trim_history(self) -> None:
if self.max_history is None:
return
if self.max_history <= 0:
self.conversation_history.clear()
return
if len(self.conversation_history) > self.max_history:
self.conversation_history = self.conversation_history[-self.max_history :]
def _detect_provider(self) -> str:
"""
Automatically detect LLM provider from environment variables or config.
Returns:
Provider name (openai, anthropic, cohere)
Raises:
ProviderError: If no provider credentials are found
"""
if self.config.provider:
return self.config.provider
# Check environment variables for API keys
if os.getenv("OPENAI_API_KEY"):
return "openai"
elif os.getenv("ANTHROPIC_API_KEY"):
return "anthropic"
elif os.getenv("COHERE_API_KEY"):
return "cohere"
else:
raise ProviderError(
"No LLM provider credentials found. Set OPENAI_API_KEY, "
"ANTHROPIC_API_KEY, or COHERE_API_KEY environment variable, "
"or specify provider explicitly."
)
def _build_hitl_context(self, run_id: str) -> Dict[str, Any]:
"""Build provider-facing HITL context for a run."""
return {
"enabled": self._hitl_enabled,
"run_id": run_id,
"agent_name": self.name,
"provider_name": self.provider_name,
"db_path": self._hitl_db_path,
}
def _get_hitl_service(self) -> Any:
"""Get or lazily initialize HITL service."""
if self._hitl_service is None:
from ..hitl.service import HITLService
self._hitl_service = HITLService(db_path=self._hitl_db_path)
return self._hitl_service
[docs]
def chat(self, message: Union[str, None]) -> str:
"""
Send a message to the agent and get a response.
Args:
message: User message to send to the agent
Returns:
Agent's response as a string
Raises:
ValueError: If message is empty or None
PravalError: If response generation fails
"""
if not message:
raise ValueError("Message cannot be empty")
# Add user message to history
self.conversation_history.append({"role": "user", "content": message})
self._trim_history()
run_id = str(uuid.uuid4())
try:
# Generate response using provider
response = self.provider.generate(
messages=self.conversation_history,
tools=list(self.tools.values()) if self.tools else None,
hitl_context=self._build_hitl_context(run_id),
)
# Add assistant response to history
self.conversation_history.append({"role": "assistant", "content": response})
self._trim_history()
# ==========================================
# Save state if persistence is enabled
if self.persist_state:
self._save_state()
return response
except (InterventionRequired, HITLConfigurationError):
raise
except Exception as e:
raise PravalError(f"Failed to generate response: {str(e)}") from e
[docs]
def get_pending_interventions(
self,
run_id: Optional[str] = None,
limit: int = 100,
) -> List[Any]:
"""Get pending interventions filtered to this agent."""
service = self._get_hitl_service()
return service.get_pending_interventions(
run_id=run_id,
agent_name=self.name,
limit=limit,
)
[docs]
def approve_intervention(
self,
intervention_id: str,
*,
reviewer: str = "human",
edited_args: Optional[Dict[str, Any]] = None,
) -> Any:
"""Approve or edit-approve an intervention for this agent."""
service = self._get_hitl_service()
intervention = service.get_intervention(intervention_id)
if intervention is None:
raise ValueError(f"Intervention '{intervention_id}' not found")
if intervention.agent_name != self.name:
raise ValueError(
f"Intervention '{intervention_id}' does not belong to agent '{self.name}'"
)
return service.approve_intervention(
intervention_id,
reviewer=reviewer,
edited_args=edited_args,
)
[docs]
def reject_intervention(
self,
intervention_id: str,
*,
reason: str,
reviewer: str = "human",
) -> Any:
"""Reject an intervention for this agent."""
service = self._get_hitl_service()
intervention = service.get_intervention(intervention_id)
if intervention is None:
raise ValueError(f"Intervention '{intervention_id}' not found")
if intervention.agent_name != self.name:
raise ValueError(
f"Intervention '{intervention_id}' does not belong to agent '{self.name}'"
)
return service.reject_intervention(
intervention_id,
reviewer=reviewer,
reason=reason,
)
[docs]
def resume_run(self, run_id: str) -> str:
"""
Resume a previously suspended HITL run after a decision.
Args:
run_id: Suspended run identifier
Returns:
Final model response for the resumed run
"""
service = self._get_hitl_service()
suspended = service.get_suspended_run(run_id)
if suspended is None:
raise ValueError(f"Suspended run '{run_id}' not found")
if suspended.agent_name != self.name:
raise ValueError(
f"Suspended run '{run_id}' belongs to '{suspended.agent_name}', "
f"not '{self.name}'"
)
if suspended.status != "pending":
raise ValueError(
f"Suspended run '{run_id}' is not pending (status={suspended.status})"
)
intervention_id = suspended.state.get("intervention_id")
if not intervention_id:
raise ValueError(f"Suspended run '{run_id}' has no linked intervention_id")
intervention = service.get_intervention(intervention_id)
if intervention is None:
raise ValueError(
f"Intervention '{intervention_id}' linked to run '{run_id}' not found"
)
if intervention.status.value == "PENDING":
raise ValueError(
f"Intervention '{intervention_id}' is still pending approval"
)
if not hasattr(self.provider, "resume_tool_flow"):
raise PravalError(
f"Provider '{self.provider_name}' does not support HITL resume"
)
try:
response = self.provider.resume_tool_flow(
suspended_state=suspended.state,
tools=list(self.tools.values()) if self.tools else None,
hitl_context={
**self._build_hitl_context(run_id),
"resume_intervention": intervention.to_dict(),
},
)
except InterventionRequired:
raise
self.conversation_history.append(
{
"role": "assistant",
"content": response,
}
)
self._trim_history()
if self.persist_state:
self._save_state()
service.mark_run_completed(run_id, response)
try:
from ..observability.tracing import get_current_span
span = get_current_span()
if span:
span.add_event(
"hitl.run.resumed",
{
"run_id": run_id,
"agent_name": self.name,
"provider_name": self.provider_name,
},
)
except Exception:
pass
return response
def _extract_parameters(self, signature: inspect.Signature) -> Dict[str, Any]:
"""Extract parameter information from function signature."""
parameters = {}
for name, param in signature.parameters.items():
parameters[name] = {
"type": (
param.annotation.__name__
if hasattr(param.annotation, "__name__")
else str(param.annotation)
),
"required": param.default == inspect.Parameter.empty,
}
return parameters
def _save_state(self) -> None:
"""Save current conversation state to storage."""
if self._storage:
self._storage.save(self.name, self.conversation_history)
def _load_state(self) -> None:
"""Load conversation state from storage."""
if self._storage:
saved_state = self._storage.load(self.name)
if saved_state:
self.conversation_history = saved_state
# ==========================================
# REEF COMMUNICATION METHODS
# ==========================================
[docs]
def send_knowledge(
self, to_agent: str, knowledge: Dict[str, Any], channel: str = "main"
) -> str:
"""
Send knowledge to another agent through the reef.
Args:
to_agent: Name of the target agent
knowledge: Knowledge data to send
channel: Reef channel to use (default: "main")
Returns:
Spore ID of the sent message
"""
from .reef import get_reef
return get_reef().send(
from_agent=self.name,
to_agent=to_agent,
knowledge=knowledge,
channel=channel,
)
[docs]
def broadcast_knowledge(
self, knowledge: Dict[str, Any], channel: str = "main"
) -> str:
"""
Broadcast knowledge to all agents in the reef.
Args:
knowledge: Knowledge data to broadcast
channel: Reef channel to use (default: "main")
Returns:
Spore ID of the broadcast message
"""
from .reef import get_reef
return get_reef().broadcast(
from_agent=self.name, knowledge=knowledge, channel=channel
)
[docs]
def request_knowledge(
self, from_agent: str, request: Dict[str, Any], timeout: int = 30
) -> Optional[Dict[str, Any]]:
"""
Request knowledge from another agent with timeout.
Args:
from_agent: Name of the agent to request from
request: Request data
timeout: Timeout in seconds
Returns:
Response data or None if timeout
"""
from .reef import get_reef, SporeType
# Set up response collection
response_received = threading.Event()
response_data = {"result": None}
def response_handler(spore):
"""Handle response spore."""
if (
spore.spore_type == SporeType.RESPONSE
and spore.to_agent == self.name
and spore.from_agent == from_agent
):
response_data["result"] = spore.knowledge
response_received.set()
# Subscribe to receive response
reef = get_reef()
reef.subscribe(self.name, response_handler, replace=False)
try:
# Send request
reef.request(
from_agent=self.name,
to_agent=from_agent,
request=request,
expires_in_seconds=timeout,
)
# Wait for response
if response_received.wait(timeout):
return response_data["result"]
else:
return None
finally:
# Ensure handler is removed to avoid leaks
channel = reef.get_channel(reef.default_channel)
if channel:
try:
handlers = channel.subscribers.get(self.name, [])
if response_handler in handlers:
handlers.remove(response_handler)
except Exception:
pass
[docs]
def on_spore_received(self, spore) -> None:
"""
Handle received spores from the reef.
This is a default implementation that can be overridden
by subclasses for custom spore handling.
Args:
spore: The received Spore object
"""
# Use custom handler if set, otherwise do nothing
if hasattr(self, "_custom_spore_handler") and self._custom_spore_handler:
result = self._custom_spore_handler(spore)
if inspect.iscoroutine(result):
try:
loop = asyncio.get_running_loop()
loop.create_task(result)
except RuntimeError:
asyncio.run(result)
# Default implementation does nothing
# Subclasses can override for custom behavior
[docs]
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Subscribe this agent to a reef channel.
Args:
channel_name: Name of the channel to subscribe to
"""
from .reef import get_reef
reef = get_reef()
# Create channel if it doesn't exist
reef.create_channel(channel_name)
reef.subscribe(self.name, self.on_spore_received, channel_name)
# Track subscription for cleanup
if channel_name not in self._subscribed_channels:
self._subscribed_channels.append(channel_name)
[docs]
def unsubscribe_from_channel(self, channel_name: str) -> None:
"""
Unsubscribe this agent from a reef channel.
Args:
channel_name: Name of the channel to unsubscribe from
"""
from .reef import get_reef
reef = get_reef()
channel = reef.get_channel(channel_name)
if channel:
channel.unsubscribe(self.name)
# Remove from tracking
if channel_name in self._subscribed_channels:
self._subscribed_channels.remove(channel_name)
@property
def spore_handler(self) -> Optional[Callable]:
"""
Get the current spore handler for this agent.
Returns:
The custom spore handler function, or None if not set
"""
return getattr(self, "_custom_spore_handler", None)
[docs]
def set_spore_handler(self, handler: Callable) -> None:
"""
Set a custom spore handler for this agent.
Args:
handler: Function that takes a Spore object and handles it
"""
self._custom_spore_handler = handler
# ==========================================
# LIFECYCLE MANAGEMENT
# ==========================================
[docs]
def close(self) -> None:
"""
Release all resources held by the agent.
This method:
- Unsubscribes from all reef channels
- Shuts down the memory system
- Clears conversation history
Safe to call multiple times. After calling close(), the agent
should not be used for further operations.
Example::
agent = Agent("assistant")
try:
response = agent.chat("Hello")
finally:
agent.close()
# Or use as context manager:
with Agent("assistant") as agent:
response = agent.chat("Hello")
"""
if self._closed:
return
self._closed = True
# Unsubscribe from reef channels
try:
from .reef import get_reef
reef = get_reef()
for channel_name in self._subscribed_channels[
:
]: # Copy to avoid mutation during iteration
try:
channel = reef.get_channel(channel_name)
if channel:
channel.unsubscribe(self.name)
except Exception as e:
logger.warning(
f"Error unsubscribing {self.name} from {channel_name}: {e}"
)
self._subscribed_channels.clear()
except Exception as e:
logger.warning(f"Error during reef cleanup for {self.name}: {e}")
# Shutdown memory system
if self.memory:
try:
if hasattr(self.memory, "shutdown"):
self.memory.shutdown()
except Exception as e:
logger.warning(f"Error shutting down memory for {self.name}: {e}")
self.memory = None
# Clear conversation history
self.conversation_history.clear()
logger.debug(f"Agent {self.name} closed")
[docs]
def __enter__(self) -> "Agent":
"""Context manager entry - returns the agent."""
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit - ensures cleanup."""
self.close()
[docs]
def __del__(self):
"""Destructor - attempt cleanup if not already done."""
# Skip cleanup during Python shutdown
# The import itself can fail during shutdown, so wrap everything
try:
import sys
if sys.meta_path is None:
return
if not getattr(self, "_closed", True):
self.close()
except Exception:
pass # Suppress all errors during garbage collection
@property
def is_closed(self) -> bool:
"""Check if the agent has been closed."""
return self._closed
# ==========================================
# MEMORY SYSTEM METHODS
# ==========================================
def _init_memory_system(self, memory_config: Optional[Dict[str, Any]] = None):
"""Initialize the memory system for this agent"""
try:
from ..memory import MemoryManager
# Default memory configuration
default_config = {
"backend": "auto",
"collection_name": f"praval_memories_{self.name}",
"knowledge_base_path": self.knowledge_base,
}
# Merge with provided config
if memory_config:
default_config.update(memory_config)
# Initialize memory manager
self.memory = MemoryManager(agent_id=self.name, **default_config)
logger.info(f"Memory system initialized for agent {self.name}")
except ImportError as e:
logger.warning(f"Memory system not available: {e}")
self.memory = None
self.memory_enabled = False
except Exception as e:
logger.warning(f"Failed to initialize memory system for {self.name}: {e}")
self.memory = None
self.memory_enabled = False
[docs]
def remember(
self, content: str, importance: float = 0.5, memory_type: str = "short_term"
) -> Optional[str]:
"""
Store a memory
Args:
content: The content to remember
importance: Importance score (0.0 to 1.0)
memory_type: Type of memory ("short_term", "semantic", "episodic")
Returns:
Memory ID if successful, None otherwise
"""
if not self.memory:
logger.debug(f"Memory not enabled for agent {self.name}")
return None
try:
from ..memory import MemoryType
# Map string to MemoryType enum
type_mapping = {
"short_term": MemoryType.SHORT_TERM,
"semantic": MemoryType.SEMANTIC,
"episodic": MemoryType.EPISODIC,
"working": MemoryType.SHORT_TERM,
}
mem_type = type_mapping.get(memory_type, MemoryType.SHORT_TERM)
return self.memory.store_memory(
agent_id=self.name,
content=content,
memory_type=mem_type,
importance=importance,
)
except Exception as e:
logger.warning(f"Failed to store memory: {e}")
return None
[docs]
def recall(
self, query: str, limit: int = 5, similarity_threshold: float = 0.1
) -> List:
"""
Recall memories based on a query
Args:
query: Search query
limit: Maximum number of results
similarity_threshold: Minimum similarity score
Returns:
List of MemoryEntry objects
"""
if not self.memory:
logger.debug(f"Memory not enabled for agent {self.name}")
return []
try:
from ..memory import MemoryQuery
memory_query = MemoryQuery(
query_text=query,
agent_id=self.name,
limit=limit,
similarity_threshold=similarity_threshold,
)
results = self.memory.search_memories(memory_query)
return results.entries
except Exception as e:
logger.warning(f"Failed to recall memories: {e}")
return []
[docs]
def recall_by_id(self, memory_id: str) -> List:
"""Recall a specific memory by ID (for resolving spore references)"""
if not self.memory:
return []
return self.memory.recall_by_id(memory_id)
[docs]
def get_conversation_context(self, turns: int = 10) -> List:
"""Get recent conversation context"""
if not self.memory:
return []
return self.memory.get_conversation_context(self.name, turns)
[docs]
def create_knowledge_reference(
self, content: str, importance: float = 0.8
) -> List[str]:
"""
Create knowledge references for lightweight spores
Args:
content: Knowledge content to store and reference
importance: Importance threshold
Returns:
List of knowledge reference IDs
"""
if not self.memory:
return []
try:
return self.memory.get_knowledge_references(content, importance)
except Exception as e:
logger.warning(f"Failed to create knowledge reference: {e}")
return []
[docs]
def resolve_spore_knowledge(self, spore) -> Dict[str, Any]:
"""
Resolve knowledge references in a spore
Args:
spore: Spore object with potential knowledge references
Returns:
Complete knowledge including resolved references
"""
if not self.memory:
return spore.knowledge
try:
from .reef import get_reef
reef = get_reef()
return reef.resolve_knowledge_references(spore, self.memory)
except Exception as e:
logger.warning(f"Failed to resolve spore knowledge: {e}")
return spore.knowledge
[docs]
def send_lightweight_knowledge(
self, to_agent: str, large_content: str, summary: str, channel: str = "main"
) -> str:
"""
Send large knowledge as lightweight spore with references
Args:
to_agent: Target agent
large_content: Large content to reference
summary: Brief summary for the spore
channel: Communication channel
Returns:
Spore ID
"""
# Create knowledge reference
refs = self.create_knowledge_reference(large_content)
if not refs:
# Fallback to direct send if referencing fails
return self.send_knowledge(to_agent, {"content": large_content}, channel)
# Send lightweight spore with reference
from .reef import get_reef
reef = get_reef()
return reef.create_knowledge_reference_spore(
from_agent=self.name,
to_agent=to_agent,
knowledge_summary=summary,
knowledge_references=refs,
channel=channel,
)