"""
Composition utilities for decorator-based agents.
This module provides utilities for composing and orchestrating agents
decorated with the @agent decorator.
"""
from typing import Callable, List, Dict, Any, Optional
import time
import threading
from .core.reef import get_reef
from .decorators import get_agent_info
[docs]
def agent_pipeline(*agents: Callable, channel: str = "pipeline") -> Callable:
"""
Compose agents into a pipeline that processes data sequentially.
Args:
*agents: Functions decorated with @agent
channel: Channel name for pipeline communication
Returns:
Function that triggers the pipeline with initial data
Example:
pipeline = agent_pipeline(explorer, analyzer, reporter)
pipeline({"task": "analyze sentiment"})
"""
# Validate all functions are agents
for agent_func in agents:
if not hasattr(agent_func, '_praval_agent'):
raise ValueError(f"Function {agent_func.__name__} is not decorated with @agent")
def pipeline_trigger(initial_data: Dict[str, Any]) -> str:
"""Trigger the pipeline with initial data."""
# Subscribe all agents to pipeline channel
for agent_func in agents:
agent_info = get_agent_info(agent_func)
agent_info["underlying_agent"].subscribe_to_channel(channel)
# Broadcast initial data to start pipeline
reef = get_reef()
reef.create_channel(channel) # Ensure channel exists
return reef.system_broadcast(initial_data, channel)
# Store metadata
pipeline_trigger._praval_pipeline = True
pipeline_trigger._praval_agents = agents
pipeline_trigger._praval_channel = channel
return pipeline_trigger
[docs]
def conditional_agent(condition_func: Callable[[Any], bool]):
"""
Decorator for conditional agent execution.
Args:
condition_func: Function that takes a spore and returns bool
Example:
@conditional_agent(lambda spore: spore.knowledge.get("priority") == "high")
@agent("urgent_processor")
def process_urgent(spore):
return {"processed": True}
"""
def decorator(agent_func: Callable) -> Callable:
if not hasattr(agent_func, '_praval_agent'):
raise ValueError("conditional_agent must be applied to @agent decorated functions")
# Get the original handler
underlying_agent = agent_func._praval_agent
original_handler = underlying_agent._custom_spore_handler
def conditional_handler(spore):
"""Only execute if condition is met."""
if condition_func(spore):
return original_handler(spore)
# Replace handler with conditional version
underlying_agent.set_spore_handler(conditional_handler)
return agent_func
return decorator
[docs]
def throttled_agent(delay_seconds: float):
"""
Decorator to throttle agent execution.
Args:
delay_seconds: Minimum seconds between executions
Example:
@throttled_agent(2.0) # Max once every 2 seconds
@agent("slow_processor")
def process_slowly(spore):
return {"processed": True}
"""
def decorator(agent_func: Callable) -> Callable:
if not hasattr(agent_func, '_praval_agent'):
raise ValueError("throttled_agent must be applied to @agent decorated functions")
last_execution = {"time": 0}
lock = threading.Lock()
# Get the original handler
underlying_agent = agent_func._praval_agent
original_handler = underlying_agent._custom_spore_handler
def throttled_handler(spore):
"""Only execute if enough time has passed."""
with lock:
now = time.time()
if now - last_execution["time"] >= delay_seconds:
last_execution["time"] = now
return original_handler(spore)
# Replace handler with throttled version
underlying_agent.set_spore_handler(throttled_handler)
return agent_func
return decorator
[docs]
class AgentSession:
"""
Context manager for coordinated agent sessions.
Example:
with AgentSession("knowledge_mining") as session:
session.add_agents(explorer, analyzer, curator)
session.broadcast({"task": "mine concepts about AI"})
"""
[docs]
def __init__(self, session_name: str):
self.session_name = session_name
self.channel_name = f"session_{session_name}"
self.agents = []
def __enter__(self):
# Create session channel
reef = get_reef()
reef.create_channel(self.channel_name)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup if needed (agents will remain subscribed)
pass
[docs]
def add_agent(self, agent_func: Callable) -> 'AgentSession':
"""Add an agent to this session."""
if not hasattr(agent_func, '_praval_agent'):
raise ValueError(f"Function {agent_func.__name__} is not decorated with @agent")
agent_info = get_agent_info(agent_func)
underlying_agent = agent_info["underlying_agent"]
# Subscribe agent to session channel
underlying_agent.subscribe_to_channel(self.channel_name)
self.agents.append(agent_func)
return self
[docs]
def add_agents(self, *agent_funcs: Callable) -> 'AgentSession':
"""Add multiple agents to this session."""
for agent_func in agent_funcs:
self.add_agent(agent_func)
return self
[docs]
def broadcast(self, data: Dict[str, Any]) -> str:
"""Broadcast data to all agents in this session."""
reef = get_reef()
return reef.system_broadcast(
{**data, "_session": self.session_name},
self.channel_name
)
[docs]
def get_stats(self) -> Dict[str, Any]:
"""Get session statistics."""
return {
"session_name": self.session_name,
"channel": self.channel_name,
"agent_count": len(self.agents),
"agent_names": [get_agent_info(agent)["name"] for agent in self.agents]
}
[docs]
def start_agents(*agent_funcs: Callable, initial_data: Optional[Dict[str, Any]] = None,
channel: str = "startup") -> str:
"""
Convenience function to start multiple agents with initial data.
Args:
*agent_funcs: Functions decorated with @agent
initial_data: Initial data to broadcast (optional)
channel: Channel to use for startup communication
Returns:
Spore ID of startup broadcast
Example:
start_agents(explorer, analyzer, curator,
initial_data={"task": "analyze market trends"})
"""
# Subscribe all agents to startup channel
reef = get_reef()
reef.create_channel(channel)
for agent_func in agent_funcs:
if not hasattr(agent_func, '_praval_agent'):
raise ValueError(f"Function {agent_func.__name__} is not decorated with @agent")
agent_info = get_agent_info(agent_func)
agent_info["underlying_agent"].subscribe_to_channel(channel)
# Broadcast initial data if provided
if initial_data:
return reef.system_broadcast(initial_data, channel)
else:
return reef.system_broadcast({"type": "agents_started"}, channel)