Source code for praval.composition

"""
Composition utilities for decorator-based agents.

This module provides utilities for composing and orchestrating agents
decorated with the @agent decorator.

Key Functions:
- start_agents(): Local agents with initial data (InMemoryBackend)
- run_agents(): Distributed agents with RabbitMQ (use AgentRunner)
- agent_pipeline(): Sequential agent processing
- AgentSession: Grouped agent communication
"""

from typing import Callable, List, Dict, Any, Optional
import time
import threading

from .core.reef import get_reef
from .decorators import get_agent_info
from .core.agent_runner import run_agents as _run_agents_impl


[docs] def agent_pipeline(*agents: Callable, channel: str = "pipeline") -> Callable: """ Compose agents into a pipeline that processes data sequentially. Args: *agents: Functions decorated with @agent channel: Channel name for pipeline communication Returns: Function that triggers the pipeline with initial data Example: pipeline = agent_pipeline(explorer, analyzer, reporter) pipeline({"task": "analyze sentiment"}) """ # Validate all functions are agents for agent_func in agents: if not hasattr(agent_func, '_praval_agent'): raise ValueError(f"Function {agent_func.__name__} is not decorated with @agent") def pipeline_trigger(initial_data: Dict[str, Any]) -> str: """Trigger the pipeline with initial data.""" # Subscribe all agents to pipeline channel for agent_func in agents: agent_info = get_agent_info(agent_func) agent_info["underlying_agent"].subscribe_to_channel(channel) # Broadcast initial data to start pipeline reef = get_reef() reef.create_channel(channel) # Ensure channel exists return reef.system_broadcast(initial_data, channel) # Store metadata pipeline_trigger._praval_pipeline = True pipeline_trigger._praval_agents = agents pipeline_trigger._praval_channel = channel return pipeline_trigger
[docs] def conditional_agent(condition_func: Callable[[Any], bool]): """ Decorator for conditional agent execution. Args: condition_func: Function that takes a spore and returns bool Example: @conditional_agent(lambda spore: spore.knowledge.get("priority") == "high") @agent("urgent_processor") def process_urgent(spore): return {"processed": True} """ def decorator(agent_func: Callable) -> Callable: if not hasattr(agent_func, '_praval_agent'): raise ValueError("conditional_agent must be applied to @agent decorated functions") # Get the original handler underlying_agent = agent_func._praval_agent original_handler = underlying_agent._custom_spore_handler def conditional_handler(spore): """Only execute if condition is met.""" if condition_func(spore): return original_handler(spore) # Replace handler with conditional version underlying_agent.set_spore_handler(conditional_handler) return agent_func return decorator
[docs] def throttled_agent(delay_seconds: float): """ Decorator to throttle agent execution. Args: delay_seconds: Minimum seconds between executions Example: @throttled_agent(2.0) # Max once every 2 seconds @agent("slow_processor") def process_slowly(spore): return {"processed": True} """ def decorator(agent_func: Callable) -> Callable: if not hasattr(agent_func, '_praval_agent'): raise ValueError("throttled_agent must be applied to @agent decorated functions") last_execution = {"time": 0} lock = threading.Lock() # Get the original handler underlying_agent = agent_func._praval_agent original_handler = underlying_agent._custom_spore_handler def throttled_handler(spore): """Only execute if enough time has passed.""" with lock: now = time.time() if now - last_execution["time"] >= delay_seconds: last_execution["time"] = now return original_handler(spore) # Replace handler with throttled version underlying_agent.set_spore_handler(throttled_handler) return agent_func return decorator
[docs] class AgentSession: """ Context manager for coordinated agent sessions. Example: with AgentSession("knowledge_mining") as session: session.add_agents(explorer, analyzer, curator) session.broadcast({"task": "mine concepts about AI"}) """
[docs] def __init__(self, session_name: str): self.session_name = session_name self.channel_name = f"session_{session_name}" self.agents = []
def __enter__(self): # Create session channel reef = get_reef() reef.create_channel(self.channel_name) return self def __exit__(self, exc_type, exc_val, exc_tb): # Cleanup if needed (agents will remain subscribed) pass
[docs] def add_agent(self, agent_func: Callable) -> 'AgentSession': """Add an agent to this session.""" if not hasattr(agent_func, '_praval_agent'): raise ValueError(f"Function {agent_func.__name__} is not decorated with @agent") agent_info = get_agent_info(agent_func) underlying_agent = agent_info["underlying_agent"] # Subscribe agent to session channel underlying_agent.subscribe_to_channel(self.channel_name) self.agents.append(agent_func) return self
[docs] def add_agents(self, *agent_funcs: Callable) -> 'AgentSession': """Add multiple agents to this session.""" for agent_func in agent_funcs: self.add_agent(agent_func) return self
[docs] def broadcast(self, data: Dict[str, Any]) -> str: """Broadcast data to all agents in this session.""" reef = get_reef() return reef.system_broadcast( {**data, "_session": self.session_name}, self.channel_name )
[docs] def get_stats(self) -> Dict[str, Any]: """Get session statistics.""" return { "session_name": self.session_name, "channel": self.channel_name, "agent_count": len(self.agents), "agent_names": [get_agent_info(agent)["name"] for agent in self.agents] }
[docs] def start_agents(*agent_funcs: Callable, initial_data: Optional[Dict[str, Any]] = None, channel: str = "startup") -> str: """ Convenience function to start multiple agents with initial data. Use this for LOCAL agents (InMemoryBackend). For DISTRIBUTED agents with RabbitMQ, use run_agents() instead. Args: *agent_funcs: Functions decorated with @agent initial_data: Initial data to broadcast (optional) channel: Channel to use for agent communication (default: "startup"). All agents will be subscribed to this channel and broadcast() will default to this channel. Returns: Spore ID of startup broadcast Example:: from praval import agent, chat, start_agents, get_reef @agent("explorer", responds_to=["research_request"]) def explorer(spore): return {"findings": chat(spore.knowledge.get("topic"))} # Start agents with initial data start_agents(explorer, analyzer, curator, initial_data={"type": "research_request", "topic": "market trends"}) # Wait for all agents to complete get_reef().wait_for_completion() get_reef().shutdown() """ # Subscribe all agents to startup channel reef = get_reef() reef.create_channel(channel) for agent_func in agent_funcs: if not hasattr(agent_func, '_praval_agent'): raise ValueError(f"Function {agent_func.__name__} is not decorated with @agent") agent_info = get_agent_info(agent_func) underlying_agent = agent_info["underlying_agent"] # Subscribe agent to the startup channel underlying_agent.subscribe_to_channel(channel) # Store the startup channel on the agent so broadcast() can use it # This ensures broadcast() defaults to the same channel agents are subscribed to underlying_agent._startup_channel = channel # Broadcast initial data if provided if initial_data: return reef.system_broadcast(initial_data, channel) else: return reef.system_broadcast({"type": "agents_started"}, channel)
[docs] def run_agents( *agent_funcs: Callable, backend_config: Optional[Dict[str, Any]] = None, channel_queue_map: Optional[Dict[str, str]] = None ) -> None: """ Run distributed agents with proper async lifecycle management. This is the recommended way to run agents with RabbitMQ backend. It: 1. Creates and manages an asyncio event loop 2. Initializes the RabbitMQ backend 3. Ensures agents consume messages from the broker 4. Handles graceful shutdown on signals (SIGTERM, SIGINT) Args: *agent_funcs: Functions decorated with @agent backend_config: RabbitMQ configuration dict: { 'url': 'amqp://user:pass@host:5672/', 'exchange_name': 'praval.agents', 'verify_tls': True/False } channel_queue_map: Optional mapping of Praval channels to RabbitMQ queues. Use this when agents should consume from pre-configured queues instead of topic-based subscriptions. Example: {"data_received": "agent.data_analyzer"} Example (Topic-based, Praval-managed routing): run_agents( processor, analyzer, backend_config={ 'url': 'amqp://localhost:5672/', 'exchange_name': 'praval.agents' } ) Example (Queue-based, pre-configured RabbitMQ): run_agents( processor, analyzer, backend_config={ 'url': 'amqp://localhost:5672/', 'exchange_name': 'praval.agents' }, channel_queue_map={ "data_received": "agent.data_analyzer", "qc_inspection_received": "agent.vision_inspector" } ) """ return _run_agents_impl( *agent_funcs, backend_config=backend_config, channel_queue_map=channel_queue_map )