praval.core.reef๏
Reef Communication System for Praval Framework.
Like coral reefs facilitate communication between polyps through chemical and biological signals, this system enables knowledge-first communication between agents through structured JSON message queues.
Components: - Spores: JSON messages containing knowledge, data, or requests - ReefChannel: Named message channels within the reef - Reef: The message queue network connecting all agents
Functions
|
Get the global reef instance. |
Reset the global reef instance to a clean state. |
Classes
|
The Reef manages all communication channels and facilitates agent communication. |
|
A message channel within the reef. |
|
A spore is a knowledge-carrying message that flows through the reef. |
|
Types of spores that can flow through the reef. |
- class praval.core.reef.SporeType(*values)[source]๏
Bases:
EnumTypes of spores that can flow through the reef.
- KNOWLEDGE = 'knowledge'๏
- REQUEST = 'request'๏
- RESPONSE = 'response'๏
- BROADCAST = 'broadcast'๏
- NOTIFICATION = 'notification'๏
- class praval.core.reef.Spore(id, spore_type, from_agent, to_agent, knowledge, created_at, expires_at=None, priority=5, reply_to=None, metadata=None, knowledge_references=None, data_references=None)[source]๏
Bases:
objectA spore is a knowledge-carrying message that flows through the reef.
Like biological spores, each carries: - Genetic material (knowledge/data) - Identification markers (metadata) - Survival instructions (processing hints)
Spores can carry either direct knowledge or lightweight references to knowledge stored in vector memory, following the principle that โlight spores travel far.โ
- Parameters:
- add_knowledge_reference(reference_id)[source]๏
Add a reference to stored knowledge
- Parameters:
reference_id (str)
- add_data_reference(reference_uri)[source]๏
Add a reference to storage system data
- Parameters:
reference_uri (str)
- to_amqp_message()[source]๏
Convert Spore to AMQP message with metadata in headers.
Design: - Body: Knowledge payload (JSON serialized) - Headers: Spore metadata (spore_id, type, from_agent, etc.) - Properties: AMQP message properties (priority, TTL, etc.)
This makes Spore the native AMQP format, eliminating intermediate conversions.
- Returns:
AMQP message ready for publication
- Return type:
aio_pika.Message
- Raises:
ImportError โ If aio-pika is not installed
- classmethod from_amqp_message(amqp_msg)[source]๏
Create Spore directly from AMQP message.
Reconstructs a Spore object from AMQP message headers and body, with zero intermediate conversions (AMQP message directly becomes Spore).
- Parameters:
amqp_msg (aio_pika.Message) โ aio_pika.Message from AMQP broker
- Returns:
Reconstructed spore object with all metadata
- Return type:
- Raises:
ImportError โ If aio-pika is not installed
ValueError โ If required spore headers are missing
- __init__(id, spore_type, from_agent, to_agent, knowledge, created_at, expires_at=None, priority=5, reply_to=None, metadata=None, knowledge_references=None, data_references=None)๏
- Parameters:
- Return type:
None
- class praval.core.reef.ReefChannel(name, max_capacity=1000, max_workers=4)[source]๏
Bases:
objectA message channel within the reef.
Like channels in a coral reef, they: - Have directional flow patterns - Can carry multiple spores simultaneously - Have capacity limits (to prevent overwhelming) - Can experience turbulence (message loss/delays)
- subscribe(agent_name, handler, replace=True)[source]๏
Subscribe an agent to receive spores from this channel.
- Parameters:
- Return type:
Note
Default behavior (replace=True) ensures that re-registering an agent in interactive environments (like Jupyter notebooks) doesnโt create duplicate subscriptions. Set replace=False if you intentionally want multiple handlers for the same agent.
- get_spores_for_agent(agent_name, limit=10)[source]๏
Get recent spores for a specific agent (polling interface).
- class praval.core.reef.Reef(default_max_workers=4, backend=None)[source]๏
Bases:
objectThe Reef manages all communication channels and facilitates agent communication.
Like a coral reef ecosystem, it: - Maintains multiple communication channels - Enables knowledge flow between polyps (agents) - Supports both direct and broadcast communication - Provides network health monitoring
The Reef uses pluggable backends for transport: - InMemoryBackend: Local agent communication (default) - RabbitMQBackend: Distributed agent communication - Future: HTTP, gRPC, Kafka, etc.
Agents work unchanged regardless of backend choice.
Message Routing: - When using InMemoryBackend: Messages routed through local ReefChannel - When using RabbitMQBackend (or other distributed): Messages routed through backend
- Parameters:
default_max_workers (int)
- __init__(default_max_workers=4, backend=None)[source]๏
Initialize Reef with optional backend.
- Parameters:
default_max_workers (
int) โ Thread workers per channel (InMemory only)backend โ ReefBackend instance (defaults to InMemoryBackend)
- create_channel(name, max_capacity=1000, max_workers=None)[source]๏
Create a new reef channel.
- Return type:
- Parameters:
- async initialize_backend(config=None)[source]๏
Initialize the Reef backend (async operation for distributed backends).
Call this method to set up distributed backends like RabbitMQ. InMemoryBackend initializes immediately, so this is optional for local usage.
- async close_backend()[source]๏
Shutdown the backend (async operation for distributed backends).
- Return type:
- send(from_agent, to_agent, knowledge, spore_type=SporeType.KNOWLEDGE, channel=None, priority=5, expires_in_seconds=None, reply_to=None, knowledge_references=None, auto_reference_large_knowledge=True)[source]๏
Send a spore through the reef.
- broadcast(from_agent, knowledge, channel=None)[source]๏
Broadcast knowledge to all agents in the reef.
- system_broadcast(knowledge, channel=None)[source]๏
Broadcast system-level messages to all agents in a channel.
- request(from_agent, to_agent, request, channel=None, expires_in_seconds=300)[source]๏
Send a knowledge request to another agent.
- reply(from_agent, to_agent, response, reply_to_spore_id, channel=None)[source]๏
Reply to a knowledge request.
- subscribe(agent_name, handler, channel=None, replace=True)[source]๏
Subscribe an agent to receive spores from a channel.
- Parameters:
agent_name (
str) โ Name of the agent subscribinghandler (
Callable[[Spore],None]) โ Callback function to handle received sporeschannel (
str) โ Channel name (uses default if None)replace (
bool) โ If True (default), replaces existing handlers for this agent. If False, adds handler to the list.
- Return type:
Note
For distributed backends (RabbitMQ, etc.), this also subscribes to the backendโs message broker, enabling cross-process communication.
- wait_for_completion(timeout=None)[source]๏
Wait for all active agent handlers to complete across all channels.
This method blocks until all currently running handlers finish, including cascading handlers triggered by broadcast() calls within agents.
- Parameters:
timeout (
Optional[float]) โ Maximum time to wait in seconds. None means wait indefinitely.- Return type:
- Returns:
True if all handlers completed, False if timeout occurred.
Example
start_agents(researcher, summarizer, initial_data={โฆ}) get_reef().wait_for_completion() # Block until all agents done get_reef().shutdown()
- create_knowledge_reference_spore(from_agent, to_agent, knowledge_summary, knowledge_references, spore_type=SporeType.KNOWLEDGE, channel=None)[source]๏
Create a lightweight spore with knowledge references
This follows the reef principle: โlight spores travel farโ