"""
Decorator-based Agent API for Praval Framework.
This module provides a Pythonic decorator interface for creating agents
that automatically handle reef communication and coordination.
Example::
from praval import agent, chat, broadcast, start_agents, get_reef
@agent("explorer", responds_to=["concept_request"])
def explore_concepts(spore):
concepts = chat("Find concepts related to: " + spore.knowledge.get("concept", ""))
broadcast({"type": "discovery", "discovered": concepts.split(",")})
return {"discovered": concepts}
# Start the agent system
start_agents(explore_concepts, initial_data={"type": "concept_request", "concept": "AI"})
get_reef().wait_for_completion()
get_reef().shutdown()
"""
import inspect
import logging
import threading
import time
from typing import Dict, Any, Optional, Callable, Union, List
from functools import wraps
logger = logging.getLogger(__name__)
from .core.agent import Agent
from .core.reef import get_reef
from .core.exceptions import ToolError, InterventionRequired
from .core.tool_registry import Tool, ToolMetadata, get_tool_registry
# Thread-local storage for current agent context
_agent_context = threading.local()
def _handle_agent_error(
error: Exception,
spore: Any,
agent_name: str,
on_error: Union[str, Callable[[Exception, Any], None]],
context: str = "handler",
) -> None:
"""
Handle errors in agent handlers based on the on_error configuration.
Args:
error: The exception that occurred
spore: The spore being processed when error occurred
agent_name: Name of the agent
on_error: Error handling strategy ("log", "raise", "ignore", or callable)
context: Context string for logging (e.g., "handler", "memory_storage")
"""
if on_error == "ignore":
return
if on_error == "log":
logger.error(
f"Error in agent '{agent_name}' ({context}): {type(error).__name__}: {error}",
exc_info=True,
)
return
if on_error == "raise":
raise error
if callable(on_error):
try:
on_error(error, spore)
except Exception as callback_error:
logger.error(
f"Error in custom error handler for agent '{agent_name}': {callback_error}",
exc_info=True,
)
return
# Unknown on_error value - default to logging
logger.warning(
f"Unknown on_error value '{on_error}' for agent '{agent_name}', defaulting to 'log'"
)
logger.error(
f"Error in agent '{agent_name}' ({context}): {type(error).__name__}: {error}",
exc_info=True,
)
def _auto_register_tools(agent: Agent, agent_name: str) -> None:
"""
Auto-register tools from the tool registry for an agent.
This function automatically registers tools that are:
1. Owned by the agent
2. Shared (available to all agents)
3. Any tools already assigned to this agent in the registry
Args:
agent: The Agent instance to register tools for
agent_name: Name of the agent
"""
try:
registry = get_tool_registry()
available_tools = registry.get_tools_for_agent(agent_name)
for tool in available_tools:
# Register the tool with its proper name from the registry,
# not the function name. This preserves explicit tool names
# like "calculator_add" instead of using "add" from func.__name__.
tool_name = tool.metadata.tool_name
tool_func = tool.func
# Get function signature for parameter extraction
import inspect
sig = inspect.signature(tool_func)
# Extract parameters from type hints
parameters = {}
for param_name, param in sig.parameters.items():
param_type = param.annotation
if param_type != inspect.Parameter.empty:
type_name = getattr(param_type, "__name__", str(param_type))
else:
type_name = "any"
parameters[param_name] = {
"type": type_name,
"required": param.default == inspect.Parameter.empty,
}
# Directly register with the proper tool name
agent.tools[tool_name] = {
"function": tool_func,
"description": tool.metadata.description or tool_func.__doc__ or "",
"parameters": parameters,
"requires_approval": tool.metadata.requires_approval,
"risk_level": tool.metadata.risk_level,
"approval_reason": tool.metadata.approval_reason,
}
except Exception as e:
# Don't fail agent creation if tool registration fails
# Just log the error
import logging
logging.getLogger(__name__).debug(f"Tool auto-registration failed: {e}")
def _attach_tool(
agent: Agent,
tool_name: str,
tool_func: Callable,
description: str,
parameters: Dict[str, Any],
) -> None:
"""Attach a tool to an agent's local tool map without overwriting existing entries."""
if tool_name in agent.tools:
return
agent.tools[tool_name] = {
"function": tool_func,
"description": description or "",
"parameters": parameters or {},
"requires_approval": False,
"risk_level": "low",
"approval_reason": "",
}
def _attach_registry_tool(agent: Agent, tool: Tool) -> None:
"""Attach a ToolRegistry tool to the agent's local tool map."""
_attach_tool(
agent,
tool.metadata.tool_name,
tool.func,
tool.metadata.description or tool.func.__doc__ or "",
tool.metadata.parameters or {},
)
agent.tools[tool.metadata.tool_name][
"requires_approval"
] = tool.metadata.requires_approval
agent.tools[tool.metadata.tool_name]["risk_level"] = tool.metadata.risk_level
agent.tools[tool.metadata.tool_name][
"approval_reason"
] = tool.metadata.approval_reason
def _register_callable_tool(agent_name: str, tool_func: Callable) -> Optional[Tool]:
"""Ensure a callable is registered in the tool registry and return the Tool object."""
registry = get_tool_registry()
if hasattr(tool_func, "_praval_tool"):
tool_obj = tool_func._praval_tool
# Ensure registry has it (in case registry was reset)
existing = registry.get_tool(tool_obj.metadata.tool_name)
if not existing:
try:
registry.register_tool(tool_obj)
except ToolError:
pass
return tool_obj
# Create and register tool from raw callable
metadata = ToolMetadata(
tool_name=tool_func.__name__,
owned_by=agent_name,
description=tool_func.__doc__ or "",
category="general",
shared=False,
)
try:
tool_obj = Tool(tool_func, metadata)
registry.register_tool(tool_obj)
return tool_obj
except ToolError:
existing = registry.get_tool(metadata.tool_name)
return existing
except Exception:
return None
[docs]
def agent(
name: Optional[str] = None,
channel: Optional[str] = None,
system_message: Optional[str] = None,
auto_broadcast: bool = True,
responds_to: Optional[List[str]] = None,
memory: Union[bool, Dict[str, Any]] = False,
knowledge_base: Optional[str] = None,
tools: Optional[List[Union[str, Callable]]] = None,
tool_categories: Optional[List[str]] = None,
auto_discover_tools: bool = True,
on_error: Union[str, Callable[[Exception, Any], None]] = "log",
hitl: bool = False,
):
"""
Decorator that turns a function into an autonomous agent.
Args:
name: Agent name (defaults to function name)
channel: Channel to subscribe to (defaults to name + "_channel")
system_message: System message (defaults to function docstring)
auto_broadcast: Whether to auto-broadcast return values
responds_to: List of message types this agent responds to (None = all messages)
memory: Memory configuration - True for defaults, dict for custom config, False to disable
knowledge_base: Path to knowledge base files for auto-indexing
on_error: Error handling strategy:
- "log" (default): Log error and continue processing
- "raise": Re-raise exception to caller
- "ignore": Silently ignore errors (not recommended)
- callable: Custom error handler function(exception, spore)
Examples::
# Basic agent with message filtering
@agent("explorer", responds_to=["concept_request"])
def explore_concepts(spore):
'''Find related concepts and broadcast discoveries.'''
concepts = chat("Related to: " + spore.knowledge.get("concept", ""))
return {"type": "discovery", "discovered": concepts.split(",")}
Memory-enabled agent::
@agent("researcher", memory=True)
def research_agent(spore):
'''Research agent with memory capabilities.'''
query = spore.knowledge.get("query")
research_agent.remember(f"Researched: {query}")
past_research = research_agent.recall(query)
return {"research": "completed", "past_similar": len(past_research)}
Agent with knowledge base::
@agent("expert", memory=True, knowledge_base="./knowledge/")
def expert_agent(spore):
'''Expert with pre-loaded knowledge base.'''
question = spore.knowledge.get("question")
relevant = expert_agent.recall(question, limit=3)
return {"answer": [r.content for r in relevant]}
Agent with custom error handling::
def my_error_handler(error, spore):
print(f"Error in agent: {error}")
# Custom recovery logic here
@agent("processor", on_error=my_error_handler)
def process_agent(spore):
'''Process with custom error handling.'''
return {"processed": True}
"""
def decorator(func: Callable) -> Callable:
# Auto-generate name from function if not provided
agent_name = name or func.__name__
agent_channel = channel or f"{agent_name}_channel"
# Auto-generate system message from docstring if not provided
auto_system_message = system_message
if not auto_system_message and func.__doc__:
auto_system_message = f"You are {agent_name}. {func.__doc__.strip()}"
# Parse memory configuration
memory_enabled = False
memory_config = None
if memory is True:
memory_enabled = True
memory_config = {}
elif isinstance(memory, dict):
memory_enabled = True
memory_config = memory
# Create underlying agent with memory support
underlying_agent = Agent(
name=agent_name,
system_message=auto_system_message,
memory_enabled=memory_enabled,
memory_config=memory_config,
knowledge_base=knowledge_base,
hitl_enabled=hitl,
)
def agent_handler(spore):
"""Handler that sets up context and calls the decorated function."""
# Check message type filtering
if responds_to is not None:
spore_type = spore.knowledge.get("type")
if spore_type not in responds_to:
# This agent doesn't respond to this message type
return
# Set agent context for chat() and broadcast() functions
_agent_context.agent = underlying_agent
_agent_context.channel = agent_channel
# Set startup channel if agent was registered via start_agents()
# This allows broadcast() to default to the channel all agents share
_agent_context.startup_channel = getattr(
underlying_agent, "_startup_channel", None
)
result = None
try:
# Resolve knowledge references in spore if memory is enabled
if memory_enabled and hasattr(spore, "has_knowledge_references"):
if spore.has_knowledge_references():
try:
resolved_knowledge = (
underlying_agent.resolve_spore_knowledge(spore)
)
spore.resolved_knowledge = resolved_knowledge
except Exception as e:
# Knowledge resolution errors are non-fatal, log and continue
_handle_agent_error(
e,
spore,
agent_name,
on_error,
context="knowledge_resolution",
)
# Call the decorated function
result = func(spore)
# Store conversation turn in memory if enabled
if memory_enabled and underlying_agent.memory:
try:
query = (
str(spore.knowledge) if spore.knowledge else "interaction"
)
response = str(result) if result else "no_response"
underlying_agent.memory.store_conversation_turn(
agent_id=agent_name,
user_message=query,
agent_response=response,
context={
"spore_id": spore.id,
"spore_type": spore.spore_type.value,
},
)
except Exception as e:
# Memory storage errors are non-fatal, log and continue
_handle_agent_error(
e, spore, agent_name, on_error, context="memory_storage"
)
# Auto-broadcast return values if enabled and result exists
if auto_broadcast and result and isinstance(result, dict):
underlying_agent.broadcast_knowledge(
{**result, "_from": agent_name, "_timestamp": time.time()},
channel=agent_channel,
)
except InterventionRequired:
raise
except Exception as e:
# Main handler error - use configured error handling strategy
_handle_agent_error(e, spore, agent_name, on_error, context="handler")
finally:
# Clean up context
_agent_context.agent = None
_agent_context.channel = None
_agent_context.startup_channel = None
return result
# Set up the agent
underlying_agent.set_spore_handler(agent_handler)
underlying_agent.subscribe_to_channel(agent_channel)
# CRITICAL FIX for reef broadcast invocation:
# Subscribe agent to the default broadcast channel so it receives
# spores from reef.broadcast(). This ensures agents listening on their
# own channel ALSO receive system-wide broadcasts.
#
# The handler is delegated through on_spore_received to the custom
# agent_handler set above, preventing duplicate invocations.
reef = get_reef()
reef.subscribe(
agent_name,
underlying_agent.on_spore_received,
channel=reef.default_channel,
replace=True,
)
# Tool attachment based on decorator params
registry = get_tool_registry()
if tool_categories:
for category in tool_categories:
for tool in registry.get_tools_by_category(category):
_attach_registry_tool(underlying_agent, tool)
if tools:
for tool_entry in tools:
if isinstance(tool_entry, str):
tool_obj = registry.get_tool(tool_entry)
if tool_obj:
_attach_registry_tool(underlying_agent, tool_obj)
else:
logger.debug("Tool '%s' not found in registry", tool_entry)
elif callable(tool_entry):
tool_obj = _register_callable_tool(agent_name, tool_entry)
if tool_obj:
_attach_registry_tool(underlying_agent, tool_obj)
else:
logger.debug(
"Tool callable '%s' failed to register",
getattr(tool_entry, "__name__", str(tool_entry)),
)
else:
logger.debug("Unsupported tool entry type: %s", type(tool_entry))
if auto_discover_tools:
# Auto-register tools from the tool registry
_auto_register_tools(underlying_agent, agent_name)
# Add memory methods to the function for easy access
if memory_enabled:
func.remember = underlying_agent.remember
func.recall = underlying_agent.recall
func.recall_by_id = underlying_agent.recall_by_id
func.get_conversation_context = underlying_agent.get_conversation_context
func.create_knowledge_reference = (
underlying_agent.create_knowledge_reference
)
func.send_lightweight_knowledge = (
underlying_agent.send_lightweight_knowledge
)
func.memory = underlying_agent.memory # Direct memory manager access
# Add reef communication methods
func.send_knowledge = underlying_agent.send_knowledge
func.broadcast_knowledge = underlying_agent.broadcast_knowledge
func.request_knowledge = underlying_agent.request_knowledge
# Add HITL methods
func.configure_hitl = underlying_agent.configure_hitl
func.get_pending_interventions = underlying_agent.get_pending_interventions
func.approve_intervention = underlying_agent.approve_intervention
func.reject_intervention = underlying_agent.reject_intervention
func.resume_run = underlying_agent.resume_run
# Add tool management methods
func.tool = underlying_agent.tool
func.add_tool = underlying_agent.tool # Alias for compatibility
func.list_tools = lambda: list(underlying_agent.tools.keys())
func.get_tool = lambda name: underlying_agent.tools.get(name)
func.has_tool = lambda name: name in underlying_agent.tools
# Store metadata on function for composition and introspection
func._praval_agent = underlying_agent
func._praval_name = agent_name
func._praval_channel = agent_channel
func._praval_auto_broadcast = auto_broadcast
func._praval_responds_to = responds_to
func._praval_memory_enabled = memory_enabled
func._praval_knowledge_base = knowledge_base
func._praval_on_error = on_error
func._praval_hitl_enabled = hitl
# Return the original function with metadata attached
return func
return decorator
[docs]
def chat(message: str, timeout: float = 10.0) -> str:
"""
Quick chat function that uses the current agent's LLM with timeout support.
Can only be used within @agent decorated functions.
Args:
message: Message to send to the LLM
timeout: Maximum time to wait for response in seconds
Returns:
LLM response as string
Raises:
RuntimeError: If called outside of an @agent function
TimeoutError: If LLM call exceeds timeout
"""
if not hasattr(_agent_context, "agent") or _agent_context.agent is None:
raise RuntimeError("chat() can only be used within @agent decorated functions")
import concurrent.futures
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"LLM call timed out after {timeout} seconds")
# Use thread-based timeout for better cross-platform support
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_agent_context.agent.chat, message)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise TimeoutError(f"LLM call timed out after {timeout} seconds")
[docs]
async def achat(message: str, timeout: float = 10.0) -> str:
"""
Async version of chat function for use within async agent handlers.
Args:
message: Message to send to the LLM
timeout: Maximum time to wait for response in seconds
Returns:
LLM response as string
Raises:
RuntimeError: If called outside of an @agent function
TimeoutError: If LLM call exceeds timeout
"""
if not hasattr(_agent_context, "agent") or _agent_context.agent is None:
raise RuntimeError("achat() can only be used within @agent decorated functions")
# Run the sync chat in a thread to avoid blocking the event loop
import asyncio
loop = asyncio.get_event_loop()
try:
return await asyncio.wait_for(
loop.run_in_executor(None, _agent_context.agent.chat, message),
timeout=timeout,
)
except asyncio.TimeoutError:
raise TimeoutError(f"LLM call timed out after {timeout} seconds")
[docs]
def broadcast(
data: Dict[str, Any],
channel: Optional[str] = None,
message_type: Optional[str] = None,
) -> str:
"""
Quick broadcast function that uses the current agent's communication.
Can only be used within @agent decorated functions.
Args:
data: Data to broadcast
channel: Channel to broadcast to. Defaults to the channel set by start_agents(),
or reef's default channel if not in a start_agents() context.
message_type: Message type to set (automatically added to data)
Returns:
Spore ID of the broadcast message
Raises:
RuntimeError: If called outside of an @agent function
Example:
# Broadcast to all agents on the same channel (set by start_agents)
broadcast({"type": "analysis_request", "data": findings})
# Broadcast to a specific channel
broadcast({"type": "alert"}, channel="urgent_alerts")
"""
if not hasattr(_agent_context, "agent") or _agent_context.agent is None:
raise RuntimeError(
"broadcast() can only be used within @agent decorated functions"
)
# Add message type to data if specified
broadcast_data = data.copy()
if message_type:
broadcast_data["type"] = message_type
# Channel resolution priority:
# 1. Explicitly passed channel parameter
# 2. Channel set by start_agents() (stored in _agent_context.startup_channel)
# 3. Reef's default channel (fallback for standalone agents)
if channel is None:
# Check if we're in a start_agents() context with a specific channel
channel = getattr(_agent_context, "startup_channel", None)
if channel is None:
reef = get_reef()
channel = reef.default_channel
return _agent_context.agent.broadcast_knowledge(broadcast_data, channel=channel)
[docs]
def get_agent_info(agent_func: Callable) -> Dict[str, Any]:
"""
Get information about an @agent decorated function.
Args:
agent_func: Function decorated with @agent
Returns:
Dictionary with agent metadata
"""
if not hasattr(agent_func, "_praval_agent"):
raise ValueError("Function is not decorated with @agent")
return {
"name": agent_func._praval_name,
"channel": agent_func._praval_channel,
"auto_broadcast": agent_func._praval_auto_broadcast,
"responds_to": agent_func._praval_responds_to,
"on_error": getattr(agent_func, "_praval_on_error", "log"),
"hitl": getattr(agent_func, "_praval_hitl_enabled", False),
"underlying_agent": agent_func._praval_agent,
}