Source code for praval.decorators

"""
Decorator-based Agent API for Praval Framework.

This module provides a Pythonic decorator interface for creating agents
that automatically handle reef communication and coordination.

Example::

    from praval import agent, chat, broadcast, start_agents, get_reef

    @agent("explorer", responds_to=["concept_request"])
    def explore_concepts(spore):
        concepts = chat("Find concepts related to: " + spore.knowledge.get("concept", ""))
        broadcast({"type": "discovery", "discovered": concepts.split(",")})
        return {"discovered": concepts}

    # Start the agent system
    start_agents(explore_concepts, initial_data={"type": "concept_request", "concept": "AI"})
    get_reef().wait_for_completion()
    get_reef().shutdown()
"""

import inspect
import logging
import threading
import time
from typing import Dict, Any, Optional, Callable, Union, List
from functools import wraps


logger = logging.getLogger(__name__)

from .core.agent import Agent
from .core.reef import get_reef
from .core.tool_registry import get_tool_registry

# Thread-local storage for current agent context
_agent_context = threading.local()


def _handle_agent_error(
    error: Exception,
    spore: Any,
    agent_name: str,
    on_error: Union[str, Callable[[Exception, Any], None]],
    context: str = "handler"
) -> None:
    """
    Handle errors in agent handlers based on the on_error configuration.

    Args:
        error: The exception that occurred
        spore: The spore being processed when error occurred
        agent_name: Name of the agent
        on_error: Error handling strategy ("log", "raise", "ignore", or callable)
        context: Context string for logging (e.g., "handler", "memory_storage")
    """
    if on_error == "ignore":
        return

    if on_error == "log":
        logger.error(
            f"Error in agent '{agent_name}' ({context}): {type(error).__name__}: {error}",
            exc_info=True
        )
        return

    if on_error == "raise":
        raise error

    if callable(on_error):
        try:
            on_error(error, spore)
        except Exception as callback_error:
            logger.error(
                f"Error in custom error handler for agent '{agent_name}': {callback_error}",
                exc_info=True
            )
        return

    # Unknown on_error value - default to logging
    logger.warning(f"Unknown on_error value '{on_error}' for agent '{agent_name}', defaulting to 'log'")
    logger.error(
        f"Error in agent '{agent_name}' ({context}): {type(error).__name__}: {error}",
        exc_info=True
    )


def _auto_register_tools(agent: Agent, agent_name: str) -> None:
    """
    Auto-register tools from the tool registry for an agent.
    
    This function automatically registers tools that are:
    1. Owned by the agent
    2. Shared (available to all agents)
    3. Any tools already assigned to this agent in the registry
    
    Args:
        agent: The Agent instance to register tools for
        agent_name: Name of the agent
    """
    try:
        registry = get_tool_registry()
        available_tools = registry.get_tools_for_agent(agent_name)
        
        for tool in available_tools:
            # Register the tool function with the agent
            tool_func = tool.func
            
            # Add the tool to the agent using the existing tool decorator
            agent.tool(tool_func)
            
    except Exception as e:
        # Don't fail agent creation if tool registration fails
        # Just log the error (in a real implementation, we'd use proper logging)
        pass


[docs] def agent(name: Optional[str] = None, channel: Optional[str] = None, system_message: Optional[str] = None, auto_broadcast: bool = True, responds_to: Optional[List[str]] = None, memory: Union[bool, Dict[str, Any]] = False, knowledge_base: Optional[str] = None, on_error: Union[str, Callable[[Exception, Any], None]] = "log"): """ Decorator that turns a function into an autonomous agent. Args: name: Agent name (defaults to function name) channel: Channel to subscribe to (defaults to name + "_channel") system_message: System message (defaults to function docstring) auto_broadcast: Whether to auto-broadcast return values responds_to: List of message types this agent responds to (None = all messages) memory: Memory configuration - True for defaults, dict for custom config, False to disable knowledge_base: Path to knowledge base files for auto-indexing on_error: Error handling strategy: - "log" (default): Log error and continue processing - "raise": Re-raise exception to caller - "ignore": Silently ignore errors (not recommended) - callable: Custom error handler function(exception, spore) Examples:: # Basic agent with message filtering @agent("explorer", responds_to=["concept_request"]) def explore_concepts(spore): '''Find related concepts and broadcast discoveries.''' concepts = chat("Related to: " + spore.knowledge.get("concept", "")) return {"type": "discovery", "discovered": concepts.split(",")} Memory-enabled agent:: @agent("researcher", memory=True) def research_agent(spore): '''Research agent with memory capabilities.''' query = spore.knowledge.get("query") research_agent.remember(f"Researched: {query}") past_research = research_agent.recall(query) return {"research": "completed", "past_similar": len(past_research)} Agent with knowledge base:: @agent("expert", memory=True, knowledge_base="./knowledge/") def expert_agent(spore): '''Expert with pre-loaded knowledge base.''' question = spore.knowledge.get("question") relevant = expert_agent.recall(question, limit=3) return {"answer": [r.content for r in relevant]} Agent with custom error handling:: def my_error_handler(error, spore): print(f"Error in agent: {error}") # Custom recovery logic here @agent("processor", on_error=my_error_handler) def process_agent(spore): '''Process with custom error handling.''' return {"processed": True} """ def decorator(func: Callable) -> Callable: # Auto-generate name from function if not provided agent_name = name or func.__name__ agent_channel = channel or f"{agent_name}_channel" # Auto-generate system message from docstring if not provided auto_system_message = system_message if not auto_system_message and func.__doc__: auto_system_message = f"You are {agent_name}. {func.__doc__.strip()}" # Parse memory configuration memory_enabled = False memory_config = None if memory is True: memory_enabled = True memory_config = {} elif isinstance(memory, dict): memory_enabled = True memory_config = memory # Create underlying agent with memory support underlying_agent = Agent( name=agent_name, system_message=auto_system_message, memory_enabled=memory_enabled, memory_config=memory_config, knowledge_base=knowledge_base ) def agent_handler(spore): """Handler that sets up context and calls the decorated function.""" # Check message type filtering if responds_to is not None: spore_type = spore.knowledge.get("type") if spore_type not in responds_to: # This agent doesn't respond to this message type return # Set agent context for chat() and broadcast() functions _agent_context.agent = underlying_agent _agent_context.channel = agent_channel # Set startup channel if agent was registered via start_agents() # This allows broadcast() to default to the channel all agents share _agent_context.startup_channel = getattr(underlying_agent, '_startup_channel', None) result = None try: # Resolve knowledge references in spore if memory is enabled if memory_enabled and hasattr(spore, 'has_knowledge_references'): if spore.has_knowledge_references(): try: resolved_knowledge = underlying_agent.resolve_spore_knowledge(spore) spore.resolved_knowledge = resolved_knowledge except Exception as e: # Knowledge resolution errors are non-fatal, log and continue _handle_agent_error( e, spore, agent_name, on_error, context="knowledge_resolution" ) # Call the decorated function result = func(spore) # Store conversation turn in memory if enabled if memory_enabled and underlying_agent.memory: try: query = str(spore.knowledge) if spore.knowledge else "interaction" response = str(result) if result else "no_response" underlying_agent.memory.store_conversation_turn( agent_id=agent_name, user_message=query, agent_response=response, context={"spore_id": spore.id, "spore_type": spore.spore_type.value} ) except Exception as e: # Memory storage errors are non-fatal, log and continue _handle_agent_error( e, spore, agent_name, on_error, context="memory_storage" ) # Auto-broadcast return values if enabled and result exists if auto_broadcast and result and isinstance(result, dict): underlying_agent.broadcast_knowledge( {**result, "_from": agent_name, "_timestamp": time.time()}, channel=agent_channel ) except Exception as e: # Main handler error - use configured error handling strategy _handle_agent_error(e, spore, agent_name, on_error, context="handler") finally: # Clean up context _agent_context.agent = None _agent_context.channel = None _agent_context.startup_channel = None return result # Set up the agent underlying_agent.set_spore_handler(agent_handler) underlying_agent.subscribe_to_channel(agent_channel) # CRITICAL FIX for reef broadcast invocation: # Subscribe agent to the default broadcast channel so it receives # spores from reef.broadcast(). This ensures agents listening on their # own channel ALSO receive system-wide broadcasts. # # The handler is delegated through on_spore_received to the custom # agent_handler set above, preventing duplicate invocations. reef = get_reef() reef.subscribe(agent_name, underlying_agent.on_spore_received, channel=reef.default_channel, replace=True) # Auto-register tools from the tool registry _auto_register_tools(underlying_agent, agent_name) # Add memory methods to the function for easy access if memory_enabled: func.remember = underlying_agent.remember func.recall = underlying_agent.recall func.recall_by_id = underlying_agent.recall_by_id func.get_conversation_context = underlying_agent.get_conversation_context func.create_knowledge_reference = underlying_agent.create_knowledge_reference func.send_lightweight_knowledge = underlying_agent.send_lightweight_knowledge func.memory = underlying_agent.memory # Direct memory manager access # Add reef communication methods func.send_knowledge = underlying_agent.send_knowledge func.broadcast_knowledge = underlying_agent.broadcast_knowledge func.request_knowledge = underlying_agent.request_knowledge # Add tool management methods func.tool = underlying_agent.tool func.add_tool = underlying_agent.tool # Alias for compatibility func.list_tools = lambda: list(underlying_agent.tools.keys()) func.get_tool = lambda name: underlying_agent.tools.get(name) func.has_tool = lambda name: name in underlying_agent.tools # Store metadata on function for composition and introspection func._praval_agent = underlying_agent func._praval_name = agent_name func._praval_channel = agent_channel func._praval_auto_broadcast = auto_broadcast func._praval_responds_to = responds_to func._praval_memory_enabled = memory_enabled func._praval_knowledge_base = knowledge_base func._praval_on_error = on_error # Return the original function with metadata attached return func return decorator
[docs] def chat(message: str, timeout: float = 10.0) -> str: """ Quick chat function that uses the current agent's LLM with timeout support. Can only be used within @agent decorated functions. Args: message: Message to send to the LLM timeout: Maximum time to wait for response in seconds Returns: LLM response as string Raises: RuntimeError: If called outside of an @agent function TimeoutError: If LLM call exceeds timeout """ if not hasattr(_agent_context, 'agent') or _agent_context.agent is None: raise RuntimeError("chat() can only be used within @agent decorated functions") import concurrent.futures import signal def timeout_handler(signum, frame): raise TimeoutError(f"LLM call timed out after {timeout} seconds") # Use thread-based timeout for better cross-platform support with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_agent_context.agent.chat, message) try: return future.result(timeout=timeout) except concurrent.futures.TimeoutError: raise TimeoutError(f"LLM call timed out after {timeout} seconds")
[docs] async def achat(message: str, timeout: float = 10.0) -> str: """ Async version of chat function for use within async agent handlers. Args: message: Message to send to the LLM timeout: Maximum time to wait for response in seconds Returns: LLM response as string Raises: RuntimeError: If called outside of an @agent function TimeoutError: If LLM call exceeds timeout """ if not hasattr(_agent_context, 'agent') or _agent_context.agent is None: raise RuntimeError("achat() can only be used within @agent decorated functions") # Run the sync chat in a thread to avoid blocking the event loop import asyncio loop = asyncio.get_event_loop() try: return await asyncio.wait_for( loop.run_in_executor(None, _agent_context.agent.chat, message), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(f"LLM call timed out after {timeout} seconds")
[docs] def broadcast(data: Dict[str, Any], channel: Optional[str] = None, message_type: Optional[str] = None) -> str: """ Quick broadcast function that uses the current agent's communication. Can only be used within @agent decorated functions. Args: data: Data to broadcast channel: Channel to broadcast to. Defaults to the channel set by start_agents(), or reef's default channel if not in a start_agents() context. message_type: Message type to set (automatically added to data) Returns: Spore ID of the broadcast message Raises: RuntimeError: If called outside of an @agent function Example: # Broadcast to all agents on the same channel (set by start_agents) broadcast({"type": "analysis_request", "data": findings}) # Broadcast to a specific channel broadcast({"type": "alert"}, channel="urgent_alerts") """ if not hasattr(_agent_context, 'agent') or _agent_context.agent is None: raise RuntimeError("broadcast() can only be used within @agent decorated functions") # Add message type to data if specified broadcast_data = data.copy() if message_type: broadcast_data["type"] = message_type # Channel resolution priority: # 1. Explicitly passed channel parameter # 2. Channel set by start_agents() (stored in _agent_context.startup_channel) # 3. Reef's default channel (fallback for standalone agents) if channel is None: # Check if we're in a start_agents() context with a specific channel channel = getattr(_agent_context, 'startup_channel', None) if channel is None: reef = get_reef() channel = reef.default_channel return _agent_context.agent.broadcast_knowledge(broadcast_data, channel=channel)
[docs] def get_agent_info(agent_func: Callable) -> Dict[str, Any]: """ Get information about an @agent decorated function. Args: agent_func: Function decorated with @agent Returns: Dictionary with agent metadata """ if not hasattr(agent_func, '_praval_agent'): raise ValueError("Function is not decorated with @agent") return { "name": agent_func._praval_name, "channel": agent_func._praval_channel, "auto_broadcast": agent_func._praval_auto_broadcast, "responds_to": agent_func._praval_responds_to, "on_error": getattr(agent_func, '_praval_on_error', 'log'), "underlying_agent": agent_func._praval_agent }