praval.composition

Composition utilities for decorator-based agents.

This module provides utilities for composing and orchestrating agents decorated with the @agent decorator.

Key Functions: - start_agents(): Local agents with initial data (InMemoryBackend) - run_agents(): Distributed agents with RabbitMQ (use AgentRunner) - agent_pipeline(): Sequential agent processing - AgentSession: Grouped agent communication

Functions

agent_pipeline(*agents[, channel])

Compose agents into a pipeline that processes data sequentially.

conditional_agent(condition_func)

Decorator for conditional agent execution.

run_agents(*agent_funcs[, backend_config, ...])

Run distributed agents with proper async lifecycle management.

start_agents(*agent_funcs[, initial_data, ...])

Convenience function to start multiple agents with initial data.

throttled_agent(delay_seconds)

Decorator to throttle agent execution.

Classes

AgentSession(session_name)

Context manager for coordinated agent sessions.

praval.composition.agent_pipeline(*agents, channel='pipeline')[source]

Compose agents into a pipeline that processes data sequentially.

Parameters:
  • *agents (Callable) – Functions decorated with @agent

  • channel (str) – Channel name for pipeline communication

Return type:

Callable

Returns:

Function that triggers the pipeline with initial data

Example

pipeline = agent_pipeline(explorer, analyzer, reporter) pipeline({“task”: “analyze sentiment”})

praval.composition.conditional_agent(condition_func)[source]

Decorator for conditional agent execution.

Parameters:

condition_func (Callable[[Any], bool]) – Function that takes a spore and returns bool

Example

@conditional_agent(lambda spore: spore.knowledge.get(“priority”) == “high”) @agent(“urgent_processor”) def process_urgent(spore):

return {“processed”: True}

praval.composition.throttled_agent(delay_seconds)[source]

Decorator to throttle agent execution.

Parameters:

delay_seconds (float) – Minimum seconds between executions

Example

@throttled_agent(2.0) # Max once every 2 seconds @agent(“slow_processor”) def process_slowly(spore):

return {“processed”: True}

class praval.composition.AgentSession(session_name)[source]

Bases: object

Context manager for coordinated agent sessions.

Example

with AgentSession(“knowledge_mining”) as session:

session.add_agents(explorer, analyzer, curator) session.broadcast({“task”: “mine concepts about AI”})

Parameters:

session_name (str)

__init__(session_name)[source]
Parameters:

session_name (str)

add_agent(agent_func)[source]

Add an agent to this session.

Return type:

AgentSession

Parameters:

agent_func (Callable)

add_agents(*agent_funcs)[source]

Add multiple agents to this session.

Return type:

AgentSession

Parameters:

agent_funcs (Callable)

broadcast(data)[source]

Broadcast data to all agents in this session.

Return type:

str

Parameters:

data (Dict[str, Any])

get_stats()[source]

Get session statistics.

Return type:

Dict[str, Any]

praval.composition.start_agents(*agent_funcs, initial_data=None, channel='startup')[source]

Convenience function to start multiple agents with initial data.

Use this for LOCAL agents (InMemoryBackend). For DISTRIBUTED agents with RabbitMQ, use run_agents() instead.

Parameters:
  • *agent_funcs (Callable) – Functions decorated with @agent

  • initial_data (Optional[Dict[str, Any]]) – Initial data to broadcast (optional)

  • channel (str) – Channel to use for agent communication (default: “startup”). All agents will be subscribed to this channel and broadcast() will default to this channel.

Return type:

str

Returns:

Spore ID of startup broadcast

Example:

from praval import agent, chat, start_agents, get_reef

@agent("explorer", responds_to=["research_request"])
def explorer(spore):
    return {"findings": chat(spore.knowledge.get("topic"))}

# Start agents with initial data
start_agents(explorer, analyzer, curator,
            initial_data={"type": "research_request", "topic": "market trends"})

# Wait for all agents to complete
get_reef().wait_for_completion()
get_reef().shutdown()
praval.composition.run_agents(*agent_funcs, backend_config=None, channel_queue_map=None)[source]

Run distributed agents with proper async lifecycle management.

This is the recommended way to run agents with RabbitMQ backend. It: 1. Creates and manages an asyncio event loop 2. Initializes the RabbitMQ backend 3. Ensures agents consume messages from the broker 4. Handles graceful shutdown on signals (SIGTERM, SIGINT)

Parameters:
  • *agent_funcs (Callable) – Functions decorated with @agent

  • backend_config (Optional[Dict[str, Any]]) –

    RabbitMQ configuration dict: {

    ’url’: ‘amqp://user:pass@host:5672/’, ‘exchange_name’: ‘praval.agents’, ‘verify_tls’: True/False

    }

  • channel_queue_map (Optional[Dict[str, str]]) – Optional mapping of Praval channels to RabbitMQ queues. Use this when agents should consume from pre-configured queues instead of topic-based subscriptions. Example: {“data_received”: “agent.data_analyzer”}

Return type:

None

Example (Topic-based, Praval-managed routing):
run_agents(

processor, analyzer, backend_config={

‘url’: ‘amqp://localhost:5672/’, ‘exchange_name’: ‘praval.agents’

}

)

Example (Queue-based, pre-configured RabbitMQ):
run_agents(

processor, analyzer, backend_config={

‘url’: ‘amqp://localhost:5672/’, ‘exchange_name’: ‘praval.agents’

}, channel_queue_map={

“data_received”: “agent.data_analyzer”, “qc_inspection_received”: “agent.vision_inspector”

}

)