praval.composition
Composition utilities for decorator-based agents.
This module provides utilities for composing and orchestrating agents decorated with the @agent decorator.
Key Functions: - start_agents(): Local agents with initial data (InMemoryBackend) - run_agents(): Distributed agents with RabbitMQ (use AgentRunner) - agent_pipeline(): Sequential agent processing - AgentSession: Grouped agent communication
Functions
|
Compose agents into a pipeline that processes data sequentially. |
|
Decorator for conditional agent execution. |
|
Run distributed agents with proper async lifecycle management. |
|
Convenience function to start multiple agents with initial data. |
|
Decorator to throttle agent execution. |
Classes
|
Context manager for coordinated agent sessions. |
- praval.composition.agent_pipeline(*agents, channel='pipeline')[source]
Compose agents into a pipeline that processes data sequentially.
- Parameters:
- Return type:
- Returns:
Function that triggers the pipeline with initial data
Example
pipeline = agent_pipeline(explorer, analyzer, reporter) pipeline({“task”: “analyze sentiment”})
- praval.composition.conditional_agent(condition_func)[source]
Decorator for conditional agent execution.
Example
@conditional_agent(lambda spore: spore.knowledge.get(“priority”) == “high”) @agent(“urgent_processor”) def process_urgent(spore):
return {“processed”: True}
- praval.composition.throttled_agent(delay_seconds)[source]
Decorator to throttle agent execution.
- Parameters:
delay_seconds (
float) – Minimum seconds between executions
Example
@throttled_agent(2.0) # Max once every 2 seconds @agent(“slow_processor”) def process_slowly(spore):
return {“processed”: True}
- class praval.composition.AgentSession(session_name)[source]
Bases:
objectContext manager for coordinated agent sessions.
Example
- with AgentSession(“knowledge_mining”) as session:
session.add_agents(explorer, analyzer, curator) session.broadcast({“task”: “mine concepts about AI”})
- Parameters:
session_name (str)
- add_agent(agent_func)[source]
Add an agent to this session.
- Return type:
- Parameters:
agent_func (Callable)
- add_agents(*agent_funcs)[source]
Add multiple agents to this session.
- Return type:
- Parameters:
agent_funcs (Callable)
- praval.composition.start_agents(*agent_funcs, initial_data=None, channel='startup')[source]
Convenience function to start multiple agents with initial data.
Use this for LOCAL agents (InMemoryBackend). For DISTRIBUTED agents with RabbitMQ, use run_agents() instead.
- Parameters:
*agent_funcs (
Callable) – Functions decorated with @agentinitial_data (
Optional[Dict[str,Any]]) – Initial data to broadcast (optional)channel (
str) – Channel to use for agent communication (default: “startup”). All agents will be subscribed to this channel and broadcast() will default to this channel.
- Return type:
- Returns:
Spore ID of startup broadcast
Example:
from praval import agent, chat, start_agents, get_reef @agent("explorer", responds_to=["research_request"]) def explorer(spore): return {"findings": chat(spore.knowledge.get("topic"))} # Start agents with initial data start_agents(explorer, analyzer, curator, initial_data={"type": "research_request", "topic": "market trends"}) # Wait for all agents to complete get_reef().wait_for_completion() get_reef().shutdown()
- praval.composition.run_agents(*agent_funcs, backend_config=None, channel_queue_map=None)[source]
Run distributed agents with proper async lifecycle management.
This is the recommended way to run agents with RabbitMQ backend. It: 1. Creates and manages an asyncio event loop 2. Initializes the RabbitMQ backend 3. Ensures agents consume messages from the broker 4. Handles graceful shutdown on signals (SIGTERM, SIGINT)
- Parameters:
*agent_funcs (
Callable) – Functions decorated with @agentbackend_config (
Optional[Dict[str,Any]]) –RabbitMQ configuration dict: {
’url’: ‘amqp://user:pass@host:5672/’, ‘exchange_name’: ‘praval.agents’, ‘verify_tls’: True/False
}
channel_queue_map (
Optional[Dict[str,str]]) – Optional mapping of Praval channels to RabbitMQ queues. Use this when agents should consume from pre-configured queues instead of topic-based subscriptions. Example: {“data_received”: “agent.data_analyzer”}
- Return type:
- Example (Topic-based, Praval-managed routing):
- run_agents(
processor, analyzer, backend_config={
‘url’: ‘amqp://localhost:5672/’, ‘exchange_name’: ‘praval.agents’
}
)
- Example (Queue-based, pre-configured RabbitMQ):
- run_agents(
processor, analyzer, backend_config={
‘url’: ‘amqp://localhost:5672/’, ‘exchange_name’: ‘praval.agents’
}, channel_queue_map={
“data_received”: “agent.data_analyzer”, “qc_inspection_received”: “agent.vision_inspector”
}
)