Source code for praval.core.reef

"""
Reef Communication System for Praval Framework.

Like coral reefs facilitate communication between polyps through chemical and biological signals,
this system enables knowledge-first communication between agents through structured JSON message queues.

Components:
- Spores: JSON messages containing knowledge, data, or requests
- ReefChannel: Named message channels within the reef
- Reef: The message queue network connecting all agents
"""

import json
import time
import uuid
import threading
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass, asdict
from collections import defaultdict, deque
from enum import Enum
from concurrent.futures import ThreadPoolExecutor, Future
import asyncio
import inspect


logger = logging.getLogger(__name__)


[docs] class SporeType(Enum): """Types of spores that can flow through the reef.""" KNOWLEDGE = "knowledge" # Pure knowledge/data sharing REQUEST = "request" # Request for information or action RESPONSE = "response" # Response to a request BROADCAST = "broadcast" # Message to all agents NOTIFICATION = "notification" # Event notification
[docs] @dataclass class Spore: """ A spore is a knowledge-carrying message that flows through the reef. Like biological spores, each carries: - Genetic material (knowledge/data) - Identification markers (metadata) - Survival instructions (processing hints) Spores can carry either direct knowledge or lightweight references to knowledge stored in vector memory, following the principle that "light spores travel far." """ id: str spore_type: SporeType from_agent: str to_agent: Optional[str] # None for broadcasts knowledge: Dict[str, Any] # The actual data payload created_at: datetime expires_at: Optional[datetime] = None priority: int = 5 # 1-10, higher = more urgent reply_to: Optional[str] = None # For request-response patterns metadata: Dict[str, Any] = None knowledge_references: List[str] = None # References to stored knowledge data_references: List[str] = None # References to storage system data def __post_init__(self): if self.metadata is None: self.metadata = {} if self.knowledge_references is None: self.knowledge_references = [] if self.data_references is None: self.data_references = []
[docs] def to_json(self) -> str: """Serialize spore to JSON for transmission.""" data = asdict(self) # Handle datetime serialization data['created_at'] = self.created_at.isoformat() if self.expires_at: data['expires_at'] = self.expires_at.isoformat() data['spore_type'] = self.spore_type.value return json.dumps(data, indent=2)
[docs] @classmethod def from_json(cls, json_str: str) -> 'Spore': """Deserialize spore from JSON.""" data = json.loads(json_str) # Handle datetime deserialization data['created_at'] = datetime.fromisoformat(data['created_at']) if data.get('expires_at'): data['expires_at'] = datetime.fromisoformat(data['expires_at']) data['spore_type'] = SporeType(data['spore_type']) return cls(**data)
[docs] def is_expired(self) -> bool: """Check if spore has expired.""" if not self.expires_at: return False return datetime.now() > self.expires_at
[docs] def add_knowledge_reference(self, reference_id: str): """Add a reference to stored knowledge""" if reference_id not in self.knowledge_references: self.knowledge_references.append(reference_id)
[docs] def add_data_reference(self, reference_uri: str): """Add a reference to storage system data""" if reference_uri not in self.data_references: self.data_references.append(reference_uri)
[docs] def has_knowledge_references(self) -> bool: """Check if spore has knowledge references""" return len(self.knowledge_references) > 0
[docs] def has_data_references(self) -> bool: """Check if spore has data references""" return len(self.data_references) > 0
[docs] def has_any_references(self) -> bool: """Check if spore has any kind of references""" return self.has_knowledge_references() or self.has_data_references()
[docs] def get_spore_size_estimate(self) -> int: """Estimate spore size for lightweight transmission""" import json try: # Estimate JSON size without actually serializing (for performance) knowledge_size = len(str(self.knowledge)) if self.knowledge else 0 metadata_size = len(str(self.metadata)) if self.metadata else 0 refs_size = len(str(self.knowledge_references)) if self.knowledge_references else 0 # Add approximate overhead for other fields overhead = 500 # Estimated JSON overhead return knowledge_size + metadata_size + refs_size + overhead except: # Fallback to actual serialization if estimation fails return len(self.to_json())
[docs] def to_amqp_message(self) -> 'aio_pika.Message': """ Convert Spore to AMQP message with metadata in headers. Design: - Body: Knowledge payload (JSON serialized) - Headers: Spore metadata (spore_id, type, from_agent, etc.) - Properties: AMQP message properties (priority, TTL, etc.) This makes Spore the native AMQP format, eliminating intermediate conversions. Returns: aio_pika.Message: AMQP message ready for publication Raises: ImportError: If aio-pika is not installed """ try: import aio_pika except ImportError: raise ImportError("aio-pika package required for AMQP serialization") # Serialize knowledge to JSON bytes knowledge_bytes = json.dumps(self.knowledge).encode('utf-8') # Build AMQP headers with spore metadata headers = { 'spore_id': self.id, 'spore_type': self.spore_type.value, 'from_agent': self.from_agent, 'to_agent': self.to_agent or '', 'created_at': self.created_at.isoformat(), 'expires_at': (self.expires_at.isoformat() if self.expires_at else ''), 'priority': str(self.priority), 'reply_to': (self.reply_to or ''), 'version': '1.0', # Protocol versioning for future compatibility } # Calculate TTL in milliseconds (if expires_at is set) expiration_ms = None if self.expires_at: ttl_seconds = (self.expires_at - datetime.now()).total_seconds() if ttl_seconds > 0: expiration_ms = int(ttl_seconds * 1000) # Create AMQP message with properties return aio_pika.Message( body=knowledge_bytes, headers=headers, message_id=self.id, timestamp=self.created_at, priority=min(max(self.priority, 0), 255), # AMQP priority range 0-255 expiration=expiration_ms, # TTL in milliseconds delivery_mode=aio_pika.DeliveryMode.PERSISTENT, content_type='application/json', )
[docs] @classmethod def from_amqp_message(cls, amqp_msg: 'aio_pika.Message') -> 'Spore': """ Create Spore directly from AMQP message. Reconstructs a Spore object from AMQP message headers and body, with zero intermediate conversions (AMQP message directly becomes Spore). Args: amqp_msg: aio_pika.Message from AMQP broker Returns: Spore: Reconstructed spore object with all metadata Raises: ImportError: If aio-pika is not installed ValueError: If required spore headers are missing """ try: import aio_pika except ImportError: raise ImportError("aio-pika package required for AMQP deserialization") headers = amqp_msg.headers or {} # Parse knowledge from message body try: knowledge = json.loads(amqp_msg.body.decode('utf-8')) except (json.JSONDecodeError, UnicodeDecodeError): # Fallback if body is not valid JSON knowledge = {"raw_content": amqp_msg.body.decode('utf-8', errors='replace')} # Parse expires_at timestamp expires_at = None if headers.get('expires_at'): try: expires_at = datetime.fromisoformat(headers['expires_at']) except (ValueError, TypeError): expires_at = None # Parse created_at timestamp created_at = datetime.now() if headers.get('created_at'): try: created_at = datetime.fromisoformat(headers['created_at']) except (ValueError, TypeError): created_at = datetime.now() elif amqp_msg.timestamp: created_at = amqp_msg.timestamp # Reconstruct Spore from headers return cls( id=headers.get('spore_id', amqp_msg.message_id or str(uuid.uuid4())), spore_type=SporeType(headers.get('spore_type', 'knowledge')), from_agent=headers.get('from_agent', 'unknown'), to_agent=(headers.get('to_agent') or None), knowledge=knowledge, created_at=created_at, expires_at=expires_at, priority=int(headers.get('priority', 5)), reply_to=(headers.get('reply_to') or None), metadata={}, # Metadata not serialized in AMQP headers (to keep headers small) knowledge_references=[], data_references=[] )
[docs] class ReefChannel: """ A message channel within the reef. Like channels in a coral reef, they: - Have directional flow patterns - Can carry multiple spores simultaneously - Have capacity limits (to prevent overwhelming) - Can experience turbulence (message loss/delays) """
[docs] def __init__(self, name: str, max_capacity: int = 1000, max_workers: int = 4): self.name = name self.max_capacity = max_capacity self.spores: deque = deque(maxlen=max_capacity) self.subscribers: Dict[str, List[Callable]] = defaultdict(list) self.lock = threading.RLock() self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=f"reef-{name}") self._shutdown = False self._active_futures: List[Future] = [] # Track active handler executions self._futures_lock = threading.Lock() self.stats = { 'spores_carried': 0, 'spores_delivered': 0, 'spores_expired': 0, 'created_at': datetime.now() }
[docs] def send_spore(self, spore: Spore) -> bool: """Send a spore through this channel.""" with self.lock: if len(self.spores) >= self.max_capacity: # Channel at capacity - oldest spores drift away self.spores.popleft() self.spores.append(spore) self.stats['spores_carried'] += 1 # Immediately try to deliver to subscribers self._deliver_spore(spore) return True
def _deliver_spore(self, spore: Spore) -> List[Future]: """Deliver spore to subscribed agents asynchronously.""" if spore.is_expired(): self.stats['spores_expired'] += 1 return [] if self._shutdown: return [] futures = [] # Deliver to specific agent if targeted if spore.to_agent and spore.to_agent in self.subscribers: for handler in self.subscribers[spore.to_agent]: future = self._execute_handler_async(handler, spore) if future: futures.append(future) # Deliver broadcasts to all subscribers except sender elif spore.spore_type == SporeType.BROADCAST: for agent_name, handlers in self.subscribers.items(): if agent_name != spore.from_agent: # Don't deliver to sender for handler in handlers: future = self._execute_handler_async(handler, spore) if future: futures.append(future) return futures def _execute_handler_async(self, handler: Callable, spore: Spore) -> Optional[Future]: """Execute handler asynchronously, supporting both sync and async handlers.""" if self._shutdown: return None def safe_handler_wrapper(): try: # Check if handler is async if inspect.iscoroutinefunction(handler): # Run async handler in new event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(handler(spore)) self.stats['spores_delivered'] += 1 return result finally: loop.close() else: # Run sync handler directly result = handler(spore) self.stats['spores_delivered'] += 1 return result except Exception as e: # Log errors but don't break the system logger.warning(f"Agent handler error in channel {self.name}: {e}") return None future = self.executor.submit(safe_handler_wrapper) # Track the future for wait_for_completion() with self._futures_lock: self._active_futures.append(future) return future
[docs] def subscribe(self, agent_name: str, handler: Callable[[Spore], None], replace: bool = True) -> None: """ Subscribe an agent to receive spores from this channel. Args: agent_name: Name of the agent subscribing handler: Callback function to handle received spores replace: If True (default), replaces existing handlers for this agent. If False, adds handler to the list (useful for multiple handlers). Note: Default behavior (replace=True) ensures that re-registering an agent in interactive environments (like Jupyter notebooks) doesn't create duplicate subscriptions. Set replace=False if you intentionally want multiple handlers for the same agent. """ with self.lock: if replace: # Replace all existing handlers with this new one self.subscribers[agent_name] = [handler] else: # Append to existing handlers self.subscribers[agent_name].append(handler)
[docs] def unsubscribe(self, agent_name: str) -> None: """Unsubscribe an agent from this channel.""" with self.lock: if agent_name in self.subscribers: del self.subscribers[agent_name]
[docs] def get_spores_for_agent(self, agent_name: str, limit: int = 10) -> List[Spore]: """Get recent spores for a specific agent (polling interface).""" with self.lock: relevant_spores = [] for spore in reversed(self.spores): # Most recent first if len(relevant_spores) >= limit: break if spore.is_expired(): continue # Include if targeted to this agent or is a broadcast if (spore.to_agent == agent_name or (spore.spore_type == SporeType.BROADCAST and spore.from_agent != agent_name)): relevant_spores.append(spore) return relevant_spores
[docs] def cleanup_expired(self) -> int: """Remove expired spores from the channel.""" with self.lock: initial_count = len(self.spores) self.spores = deque([s for s in self.spores if not s.is_expired()], maxlen=self.max_capacity) expired_count = initial_count - len(self.spores) self.stats['spores_expired'] += expired_count return expired_count
[docs] def get_stats(self) -> Dict[str, Any]: """Get channel statistics.""" with self.lock: return { 'name': self.name, 'spores_in_channel': len(self.spores), 'max_capacity': self.max_capacity, 'subscriber_count': sum(len(handlers) for handlers in self.subscribers.values()), 'active_threads': len(self.executor._threads) if hasattr(self.executor, '_threads') else 0, 'shutdown': self._shutdown, **self.stats }
[docs] def wait_for_completion(self, timeout: Optional[float] = None) -> bool: """ Wait for all active handler executions to complete. This method blocks until all currently running and pending handlers finish, including handlers that spawn new handlers (cascading messages). Args: timeout: Maximum time to wait in seconds. None means wait indefinitely. Returns: True if all handlers completed, False if timeout occurred. """ from concurrent.futures import wait as futures_wait, FIRST_COMPLETED, ALL_COMPLETED start_time = time.time() while True: # Get current active futures with self._futures_lock: # Clean up completed futures self._active_futures = [f for f in self._active_futures if not f.done()] pending = list(self._active_futures) if not pending: return True # Calculate remaining timeout remaining_timeout = None if timeout is not None: elapsed = time.time() - start_time remaining_timeout = max(0, timeout - elapsed) if remaining_timeout <= 0: return False # Wait for at least one future to complete try: futures_wait(pending, timeout=remaining_timeout, return_when=ALL_COMPLETED) except Exception: pass # Check if we've timed out if timeout is not None and (time.time() - start_time) >= timeout: return False
[docs] def shutdown(self, wait: bool = True) -> None: """Shutdown the channel's thread pool.""" self._shutdown = True self.executor.shutdown(wait=wait)
[docs] class Reef: """ The Reef manages all communication channels and facilitates agent communication. Like a coral reef ecosystem, it: - Maintains multiple communication channels - Enables knowledge flow between polyps (agents) - Supports both direct and broadcast communication - Provides network health monitoring The Reef uses pluggable backends for transport: - InMemoryBackend: Local agent communication (default) - RabbitMQBackend: Distributed agent communication - Future: HTTP, gRPC, Kafka, etc. Agents work unchanged regardless of backend choice. Message Routing: - When using InMemoryBackend: Messages routed through local ReefChannel - When using RabbitMQBackend (or other distributed): Messages routed through backend """
[docs] def __init__(self, default_max_workers: int = 4, backend=None): """ Initialize Reef with optional backend. Args: default_max_workers: Thread workers per channel (InMemory only) backend: ReefBackend instance (defaults to InMemoryBackend) """ self.channels: Dict[str, ReefChannel] = {} self.default_channel = "main" self.default_max_workers = default_max_workers self.lock = threading.RLock() self._shutdown = False # Set backend (default to InMemory for backward compatibility) if backend is None: from .reef_backend import InMemoryBackend backend = InMemoryBackend() self.backend = backend self._backend_initialized = False # Create default channel self.create_channel(self.default_channel) # Start background cleanup self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) self.cleanup_thread.start()
def _is_distributed_backend(self) -> bool: """ Check if the current backend is distributed (requires async message routing). Returns: True if using RabbitMQ or other distributed backend, False for InMemory. """ from .reef_backend import InMemoryBackend return not isinstance(self.backend, InMemoryBackend) and self._backend_initialized def _run_async(self, coro): """ Run an async coroutine from sync context. Handles the async/sync boundary for backends that require async operations. """ try: loop = asyncio.get_running_loop() # We're already in an async context, create a task return asyncio.ensure_future(coro) except RuntimeError: # No running loop, create a new one loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro) finally: loop.close()
[docs] def create_channel(self, name: str, max_capacity: int = 1000, max_workers: Optional[int] = None) -> ReefChannel: """Create a new reef channel.""" with self.lock: if name in self.channels: return self.channels[name] workers = max_workers or self.default_max_workers channel = ReefChannel(name, max_capacity, workers) self.channels[name] = channel return channel
[docs] def get_channel(self, name: str) -> Optional[ReefChannel]: """Get a reef channel by name.""" return self.channels.get(name)
[docs] async def initialize_backend(self, config: Optional[Dict[str, Any]] = None) -> None: """ Initialize the Reef backend (async operation for distributed backends). Call this method to set up distributed backends like RabbitMQ. InMemoryBackend initializes immediately, so this is optional for local usage. Args: config: Backend-specific configuration (passed to backend.initialize()) """ if not self._backend_initialized: await self.backend.initialize(config or {}) self._backend_initialized = True logger.info(f"Reef backend initialized: {self.backend.__class__.__name__}")
[docs] async def close_backend(self) -> None: """Shutdown the backend (async operation for distributed backends).""" if self._backend_initialized: await self.backend.shutdown() self._backend_initialized = False logger.info(f"Reef backend shutdown: {self.backend.__class__.__name__}")
[docs] def send(self, from_agent: str, to_agent: Optional[str], knowledge: Dict[str, Any], spore_type: SporeType = SporeType.KNOWLEDGE, channel: str = None, priority: int = 5, expires_in_seconds: Optional[int] = None, reply_to: Optional[str] = None, knowledge_references: Optional[List[str]] = None, auto_reference_large_knowledge: bool = True) -> str: """Send a spore through the reef.""" # Use default channel if none specified if channel is None: channel = self.default_channel reef_channel = self.get_channel(channel) if not reef_channel: raise ValueError(f"Reef channel '{channel}' not found") # Create expiration time if specified expires_at = None if expires_in_seconds: expires_at = datetime.now() + timedelta(seconds=expires_in_seconds) # Handle knowledge references for lightweight spores final_knowledge = knowledge final_references = knowledge_references or [] # Auto-reference large knowledge if enabled if auto_reference_large_knowledge and knowledge: knowledge_size = len(str(knowledge)) if knowledge_size > 1000: # Threshold for large knowledge # TODO: Store knowledge and replace with reference # This would require access to a memory manager logger.debug(f"Large knowledge detected ({knowledge_size} chars) - consider using references") # Create spore spore = Spore( id=str(uuid.uuid4()), spore_type=spore_type, from_agent=from_agent, to_agent=to_agent, knowledge=final_knowledge, created_at=datetime.now(), expires_at=expires_at, priority=priority, reply_to=reply_to, knowledge_references=final_references ) # Route through backend for distributed systems, or local channel for in-memory if self._is_distributed_backend(): # Use distributed backend (RabbitMQ, etc.) logger.debug(f"Routing spore {spore.id} through distributed backend to channel: {channel}") self._run_async(self.backend.send(spore, channel)) else: # Use local in-memory channel reef_channel.send_spore(spore) return spore.id
[docs] def broadcast(self, from_agent: str, knowledge: Dict[str, Any], channel: str = None) -> str: """Broadcast knowledge to all agents in the reef.""" return self.send( from_agent=from_agent, to_agent=None, knowledge=knowledge, spore_type=SporeType.BROADCAST, channel=channel )
[docs] def system_broadcast(self, knowledge: Dict[str, Any], channel: str = None) -> str: """Broadcast system-level messages to all agents in a channel.""" return self.broadcast( from_agent="system", knowledge=knowledge, channel=channel )
[docs] def request(self, from_agent: str, to_agent: str, request: Dict[str, Any], channel: str = None, expires_in_seconds: int = 300) -> str: """Send a knowledge request to another agent.""" return self.send( from_agent=from_agent, to_agent=to_agent, knowledge=request, spore_type=SporeType.REQUEST, channel=channel, expires_in_seconds=expires_in_seconds )
[docs] def reply(self, from_agent: str, to_agent: str, response: Dict[str, Any], reply_to_spore_id: str, channel: str = None) -> str: """Reply to a knowledge request.""" return self.send( from_agent=from_agent, to_agent=to_agent, knowledge=response, spore_type=SporeType.RESPONSE, channel=channel, reply_to=reply_to_spore_id )
[docs] def subscribe(self, agent_name: str, handler: Callable[[Spore], None], channel: str = None, replace: bool = True) -> None: """ Subscribe an agent to receive spores from a channel. Args: agent_name: Name of the agent subscribing handler: Callback function to handle received spores channel: Channel name (uses default if None) replace: If True (default), replaces existing handlers for this agent. If False, adds handler to the list. Note: For distributed backends (RabbitMQ, etc.), this also subscribes to the backend's message broker, enabling cross-process communication. """ if channel is None: channel = self.default_channel reef_channel = self.get_channel(channel) if reef_channel: # Always register locally (needed for local message tracking) reef_channel.subscribe(agent_name, handler, replace=replace) # Also subscribe through distributed backend if active if self._is_distributed_backend(): logger.debug(f"Subscribing agent '{agent_name}' to distributed backend channel: {channel}") self._run_async(self.backend.subscribe(channel, handler))
[docs] def get_network_stats(self) -> Dict[str, Any]: """Get statistics about the reef network.""" with self.lock: stats = { 'total_channels': len(self.channels), 'backend': self.backend.__class__.__name__, 'backend_stats': self.backend.get_stats(), 'channel_stats': {} } for name, channel in self.channels.items(): stats['channel_stats'][name] = { 'active_spores': len(channel.spores), 'subscribers': len(channel.subscribers), **channel.stats } return stats
[docs] def wait_for_completion(self, timeout: Optional[float] = None) -> bool: """ Wait for all active agent handlers to complete across all channels. This method blocks until all currently running handlers finish, including cascading handlers triggered by broadcast() calls within agents. Args: timeout: Maximum time to wait in seconds. None means wait indefinitely. Returns: True if all handlers completed, False if timeout occurred. Example: start_agents(researcher, summarizer, initial_data={...}) get_reef().wait_for_completion() # Block until all agents done get_reef().shutdown() """ start_time = time.time() for channel in self.channels.values(): # Calculate remaining timeout for this channel remaining_timeout = None if timeout is not None: elapsed = time.time() - start_time remaining_timeout = max(0, timeout - elapsed) if remaining_timeout <= 0: return False if not channel.wait_for_completion(timeout=remaining_timeout): return False return True
[docs] def shutdown(self, wait: bool = True) -> None: """Shutdown the reef and all its channels.""" self._shutdown = True # Shutdown all channels for channel in self.channels.values(): channel.shutdown(wait=wait) # Wait for cleanup thread to finish if requested if wait and self.cleanup_thread.is_alive(): self.cleanup_thread.join(timeout=5.0)
[docs] def create_knowledge_reference_spore(self, from_agent: str, to_agent: Optional[str], knowledge_summary: str, knowledge_references: List[str], spore_type: SporeType = SporeType.KNOWLEDGE, channel: str = None) -> str: """ Create a lightweight spore with knowledge references This follows the reef principle: "light spores travel far" """ return self.send( from_agent=from_agent, to_agent=to_agent, knowledge={ "type": "knowledge_reference", "summary": knowledge_summary, "reference_count": len(knowledge_references) }, spore_type=spore_type, channel=channel, knowledge_references=knowledge_references, auto_reference_large_knowledge=False # Already handled )
[docs] def resolve_knowledge_references(self, spore: Spore, memory_manager) -> Dict[str, Any]: """ Resolve knowledge references in a spore to actual knowledge Args: spore: The spore with knowledge references memory_manager: Agent's memory manager to resolve references Returns: Combined knowledge from references """ if not spore.has_knowledge_references(): return spore.knowledge resolved_knowledge = dict(spore.knowledge) if spore.knowledge else {} resolved_knowledge["referenced_knowledge"] = [] for ref_id in spore.knowledge_references: try: memories = memory_manager.recall_by_id(ref_id) if memories: resolved_knowledge["referenced_knowledge"].append({ "reference_id": ref_id, "content": memories[0].content, "metadata": memories[0].metadata }) except Exception as e: logger.warning(f"Failed to resolve knowledge reference {ref_id}: {e}") return resolved_knowledge
def _cleanup_loop(self) -> None: """Background thread to clean up expired spores.""" while not self._shutdown: try: time.sleep(60) # Cleanup every minute if not self._shutdown: # Double-check before cleanup for channel in self.channels.values(): channel.cleanup_expired() except Exception as e: # Silent cleanup failures to prevent thread death pass
# Global reef instance _global_reef = Reef()
[docs] def get_reef() -> Reef: """Get the global reef instance.""" return _global_reef
[docs] def reset_reef() -> None: """ Reset the global reef instance to a clean state. This is primarily used for testing to ensure test isolation. Clears all channels and reinitializes with just the default channel. """ global _global_reef with _global_reef.lock: # Clear all channels _global_reef.channels.clear() # Recreate default channel _global_reef.create_channel(_global_reef.default_channel) _global_reef._shutdown = False _global_reef._backend_initialized = False