praval.composition๏ƒ

Composition utilities for decorator-based agents.

This module provides utilities for composing and orchestrating agents decorated with the @agent decorator.

Key Functions: - start_agents(): Local agents with initial data (InMemoryBackend) - run_agents(): Distributed agents with RabbitMQ (use AgentRunner) - agent_pipeline(): Sequential agent processing - AgentSession: Grouped agent communication

Functions

agent_pipeline(*agents[,ย channel])

Compose agents into a pipeline that processes data sequentially.

conditional_agent(condition_func)

Decorator for conditional agent execution.

run_agents(*agent_funcs[,ย backend_config,ย ...])

Run distributed agents with proper async lifecycle management.

start_agents(*agent_funcs[,ย initial_data,ย ...])

Convenience function to start multiple agents with initial data.

throttled_agent(delay_seconds)

Decorator to throttle agent execution.

Classes

AgentSession(session_name)

Context manager for coordinated agent sessions.

praval.composition.agent_pipeline(*agents, channel='pipeline')[source]๏ƒ

Compose agents into a pipeline that processes data sequentially.

Parameters:
  • *agents (Callable) โ€“ Functions decorated with @agent

  • channel (str) โ€“ Channel name for pipeline communication

Return type:

Callable

Returns:

Function that triggers the pipeline with initial data

Example:

pipeline = agent_pipeline(explorer, analyzer, reporter)
pipeline({"task": "analyze sentiment"})
praval.composition.conditional_agent(condition_func)[source]๏ƒ

Decorator for conditional agent execution.

Parameters:

condition_func (Callable[[Any], bool]) โ€“ Function that takes a spore and returns bool

Example:

@conditional_agent(lambda spore: spore.knowledge.get("priority") == "high")
@agent("urgent_processor")
def process_urgent(spore):
    return {"processed": True}
praval.composition.throttled_agent(delay_seconds)[source]๏ƒ

Decorator to throttle agent execution.

Parameters:

delay_seconds (float) โ€“ Minimum seconds between executions

Example:

@throttled_agent(2.0)  # Max once every 2 seconds
@agent("slow_processor")
def process_slowly(spore):
    return {"processed": True}
class praval.composition.AgentSession(session_name)[source]๏ƒ

Bases: object

Context manager for coordinated agent sessions.

Example

with AgentSession(โ€œknowledge_miningโ€) as session:

session.add_agents(explorer, analyzer, curator) session.broadcast({โ€œtaskโ€: โ€œmine concepts about AIโ€})

Parameters:

session_name (str)

__init__(session_name)[source]๏ƒ
Parameters:

session_name (str)

add_agent(agent_func)[source]๏ƒ

Add an agent to this session.

Return type:

AgentSession

Parameters:

agent_func (Callable)

add_agents(*agent_funcs)[source]๏ƒ

Add multiple agents to this session.

Return type:

AgentSession

Parameters:

agent_funcs (Callable)

broadcast(data)[source]๏ƒ

Broadcast data to all agents in this session.

Return type:

str

Parameters:

data (Dict[str, Any])

get_stats()[source]๏ƒ

Get session statistics.

Return type:

Dict[str, Any]

praval.composition.start_agents(*agent_funcs, initial_data=None, channel='startup')[source]๏ƒ

Convenience function to start multiple agents with initial data.

Use this for LOCAL agents (InMemoryBackend). For DISTRIBUTED agents with RabbitMQ, use run_agents() instead.

Parameters:
  • *agent_funcs (Callable) โ€“ Functions decorated with @agent

  • initial_data (Optional[Dict[str, Any]]) โ€“ Initial data to broadcast (optional)

  • channel (str) โ€“ Channel to use for agent communication (default: โ€œstartupโ€). All agents will be subscribed to this channel and broadcast() will default to this channel.

Return type:

str

Returns:

Spore ID of startup broadcast

Example:

from praval import agent, chat, start_agents, get_reef

@agent("explorer", responds_to=["research_request"])
def explorer(spore):
    return {"findings": chat(spore.knowledge.get("topic"))}

# Start agents with initial data
start_agents(explorer, analyzer, curator,
            initial_data={"type": "research_request", "topic": "market trends"})

# Wait for all agents to complete
get_reef().wait_for_completion()
get_reef().shutdown()
praval.composition.run_agents(*agent_funcs, backend_config=None, channel_queue_map=None)[source]๏ƒ

Run distributed agents with proper async lifecycle management.

This is the recommended way to run agents with RabbitMQ backend. It:

  1. Creates and manages an asyncio event loop

  2. Initializes the RabbitMQ backend

  3. Ensures agents consume messages from the broker

  4. Handles graceful shutdown on signals (SIGTERM, SIGINT)

Parameters:
  • *agent_funcs (Callable) โ€“ Functions decorated with @agent

  • backend_config (Optional[Dict[str, Any]]) โ€“

    RabbitMQ configuration dict. Example:

    {
        "url": "amqp://user:pass@host:5672/",
        "exchange_name": "praval.agents",
        "verify_tls": True
    }
    

  • channel_queue_map (Optional[Dict[str, str]]) โ€“ Optional mapping of Praval channels to RabbitMQ queues. Use this when agents should consume from pre-configured queues instead of topic-based subscriptions. Example: {"data_received": "agent.data_analyzer"}

Return type:

None

Example (Topic-based, Praval-managed routing):

run_agents(
    processor,
    analyzer,
    backend_config={
        "url": "amqp://localhost:5672/",
        "exchange_name": "praval.agents"
    }
)

Example (Queue-based, pre-configured RabbitMQ):

run_agents(
    processor,
    analyzer,
    backend_config={
        "url": "amqp://localhost:5672/",
        "exchange_name": "praval.agents"
    },
    channel_queue_map={
        "data_received": "agent.data_analyzer",
        "qc_inspection_received": "agent.vision_inspector"
    }
)