Source code for praval.decorators

"""
Decorator-based Agent API for Praval Framework.

This module provides a Pythonic decorator interface for creating agents
that automatically handle reef communication and coordination.

Example:
    @agent("explorer", channel="knowledge")
    def explore_concepts(spore):
        concepts = chat("Find concepts related to: " + spore.knowledge.get("concept", ""))
        return {"discovered": concepts.split(",")}
"""

import inspect
import threading
import time
from typing import Dict, Any, Optional, Callable, Union, List
from functools import wraps

from .core.agent import Agent
from .core.reef import get_reef
from .core.tool_registry import get_tool_registry

# Thread-local storage for current agent context
_agent_context = threading.local()


def _auto_register_tools(agent: Agent, agent_name: str) -> None:
    """
    Auto-register tools from the tool registry for an agent.
    
    This function automatically registers tools that are:
    1. Owned by the agent
    2. Shared (available to all agents)
    3. Any tools already assigned to this agent in the registry
    
    Args:
        agent: The Agent instance to register tools for
        agent_name: Name of the agent
    """
    try:
        registry = get_tool_registry()
        available_tools = registry.get_tools_for_agent(agent_name)
        
        for tool in available_tools:
            # Register the tool function with the agent
            tool_func = tool.func
            
            # Add the tool to the agent using the existing tool decorator
            agent.tool(tool_func)
            
    except Exception as e:
        # Don't fail agent creation if tool registration fails
        # Just log the error (in a real implementation, we'd use proper logging)
        pass


[docs] def agent(name: Optional[str] = None, channel: Optional[str] = None, system_message: Optional[str] = None, auto_broadcast: bool = True, responds_to: Optional[List[str]] = None, memory: Union[bool, Dict[str, Any]] = False, knowledge_base: Optional[str] = None): """ Decorator that turns a function into an autonomous agent. Args: name: Agent name (defaults to function name) channel: Channel to subscribe to (defaults to name + "_channel") system_message: System message (defaults to function docstring) auto_broadcast: Whether to auto-broadcast return values responds_to: List of message types this agent responds to (None = all messages) memory: Memory configuration - True for defaults, dict for custom config, False to disable knowledge_base: Path to knowledge base files for auto-indexing Examples: Basic agent: @agent("explorer", channel="knowledge", responds_to=["concept_request"]) def explore_concepts(spore): '''Find related concepts and broadcast discoveries.''' concepts = chat("Related to: " + spore.knowledge.get("concept", "")) return {"type": "discovery", "discovered": concepts.split(",")} Agent with memory: @agent("researcher", memory=True) def research_agent(spore): '''Research agent with memory capabilities.''' query = spore.knowledge.get("query") # Remember the research research_agent.remember(f"Researched: {query}") # Recall similar past research past_research = research_agent.recall(query) return {"research": "completed", "past_similar": len(past_research)} Agent with knowledge base: @agent("expert", memory=True, knowledge_base="./knowledge/") def expert_agent(spore): '''Expert with pre-loaded knowledge base.''' question = spore.knowledge.get("question") relevant = expert_agent.recall(question, limit=3) return {"answer": [r.content for r in relevant]} """ def decorator(func: Callable) -> Callable: # Auto-generate name from function if not provided agent_name = name or func.__name__ agent_channel = channel or f"{agent_name}_channel" # Auto-generate system message from docstring if not provided auto_system_message = system_message if not auto_system_message and func.__doc__: auto_system_message = f"You are {agent_name}. {func.__doc__.strip()}" # Parse memory configuration memory_enabled = False memory_config = None if memory is True: memory_enabled = True memory_config = {} elif isinstance(memory, dict): memory_enabled = True memory_config = memory # Create underlying agent with memory support underlying_agent = Agent( name=agent_name, system_message=auto_system_message, memory_enabled=memory_enabled, memory_config=memory_config, knowledge_base=knowledge_base ) def agent_handler(spore): """Handler that sets up context and calls the decorated function.""" # Check message type filtering if responds_to is not None: spore_type = spore.knowledge.get("type") if spore_type not in responds_to: # This agent doesn't respond to this message type return # Set agent context for chat() and broadcast() functions _agent_context.agent = underlying_agent _agent_context.channel = agent_channel try: # Resolve knowledge references in spore if memory is enabled if memory_enabled and hasattr(spore, 'has_knowledge_references'): if spore.has_knowledge_references(): try: resolved_knowledge = underlying_agent.resolve_spore_knowledge(spore) spore.resolved_knowledge = resolved_knowledge except Exception as e: # If knowledge resolution fails, continue without resolved knowledge pass # Call the decorated function result = func(spore) # Store conversation turn in memory if enabled if memory_enabled and underlying_agent.memory: try: query = str(spore.knowledge) if spore.knowledge else "interaction" response = str(result) if result else "no_response" underlying_agent.memory.store_conversation_turn( agent_id=agent_name, user_message=query, agent_response=response, context={"spore_id": spore.id, "spore_type": spore.spore_type.value} ) except Exception as e: # Don't fail the agent if memory storage fails pass # Auto-broadcast return values if enabled and result exists if auto_broadcast and result and isinstance(result, dict): underlying_agent.broadcast_knowledge( {**result, "_from": agent_name, "_timestamp": time.time()}, channel=agent_channel ) finally: # Clean up context _agent_context.agent = None _agent_context.channel = None # Set up the agent underlying_agent.set_spore_handler(agent_handler) underlying_agent.subscribe_to_channel(agent_channel) # Auto-register tools from the tool registry _auto_register_tools(underlying_agent, agent_name) # Add memory methods to the function for easy access if memory_enabled: func.remember = underlying_agent.remember func.recall = underlying_agent.recall func.recall_by_id = underlying_agent.recall_by_id func.get_conversation_context = underlying_agent.get_conversation_context func.create_knowledge_reference = underlying_agent.create_knowledge_reference func.send_lightweight_knowledge = underlying_agent.send_lightweight_knowledge func.memory = underlying_agent.memory # Direct memory manager access # Add reef communication methods func.send_knowledge = underlying_agent.send_knowledge func.broadcast_knowledge = underlying_agent.broadcast_knowledge func.request_knowledge = underlying_agent.request_knowledge # Add tool management methods func.tool = underlying_agent.tool func.add_tool = underlying_agent.tool # Alias for compatibility func.list_tools = lambda: list(underlying_agent.tools.keys()) func.get_tool = lambda name: underlying_agent.tools.get(name) func.has_tool = lambda name: name in underlying_agent.tools # Store metadata on function for composition and introspection func._praval_agent = underlying_agent func._praval_name = agent_name func._praval_channel = agent_channel func._praval_auto_broadcast = auto_broadcast func._praval_responds_to = responds_to func._praval_memory_enabled = memory_enabled func._praval_knowledge_base = knowledge_base # Return the original function with metadata attached return func return decorator
[docs] def chat(message: str, timeout: float = 10.0) -> str: """ Quick chat function that uses the current agent's LLM with timeout support. Can only be used within @agent decorated functions. Args: message: Message to send to the LLM timeout: Maximum time to wait for response in seconds Returns: LLM response as string Raises: RuntimeError: If called outside of an @agent function TimeoutError: If LLM call exceeds timeout """ if not hasattr(_agent_context, 'agent') or _agent_context.agent is None: raise RuntimeError("chat() can only be used within @agent decorated functions") import concurrent.futures import signal def timeout_handler(signum, frame): raise TimeoutError(f"LLM call timed out after {timeout} seconds") # Use thread-based timeout for better cross-platform support with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_agent_context.agent.chat, message) try: return future.result(timeout=timeout) except concurrent.futures.TimeoutError: raise TimeoutError(f"LLM call timed out after {timeout} seconds")
[docs] async def achat(message: str, timeout: float = 10.0) -> str: """ Async version of chat function for use within async agent handlers. Args: message: Message to send to the LLM timeout: Maximum time to wait for response in seconds Returns: LLM response as string Raises: RuntimeError: If called outside of an @agent function TimeoutError: If LLM call exceeds timeout """ if not hasattr(_agent_context, 'agent') or _agent_context.agent is None: raise RuntimeError("achat() can only be used within @agent decorated functions") # Run the sync chat in a thread to avoid blocking the event loop import asyncio loop = asyncio.get_event_loop() try: return await asyncio.wait_for( loop.run_in_executor(None, _agent_context.agent.chat, message), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(f"LLM call timed out after {timeout} seconds")
[docs] def broadcast(data: Dict[str, Any], channel: Optional[str] = None, message_type: Optional[str] = None) -> str: """ Quick broadcast function that uses the current agent's communication. Can only be used within @agent decorated functions. Args: data: Data to broadcast channel: Channel to broadcast to (defaults to agent's channel) message_type: Message type to set (automatically added to data) Returns: Spore ID of the broadcast message Raises: RuntimeError: If called outside of an @agent function """ if not hasattr(_agent_context, 'agent') or _agent_context.agent is None: raise RuntimeError("broadcast() can only be used within @agent decorated functions") # Add message type to data if specified broadcast_data = data.copy() if message_type: broadcast_data["type"] = message_type target_channel = channel or _agent_context.channel return _agent_context.agent.broadcast_knowledge(broadcast_data, channel=target_channel)
[docs] def get_agent_info(agent_func: Callable) -> Dict[str, Any]: """ Get information about an @agent decorated function. Args: agent_func: Function decorated with @agent Returns: Dictionary with agent metadata """ if not hasattr(agent_func, '_praval_agent'): raise ValueError("Function is not decorated with @agent") return { "name": agent_func._praval_name, "channel": agent_func._praval_channel, "auto_broadcast": agent_func._praval_auto_broadcast, "responds_to": agent_func._praval_responds_to, "underlying_agent": agent_func._praval_agent }