Source code for praval.storage.data_manager

"""
Data Manager - Unified interface for storage operations

Provides a high-level interface for agents to interact with multiple
storage providers through a single, consistent API.
"""

import asyncio
import logging
from typing import Any, Dict, List, Optional, Union
import threading

from .storage_registry import get_storage_registry, StorageRegistry
from .base_provider import BaseStorageProvider, StorageQuery, StorageResult, DataReference
from .exceptions import StorageNotFoundError, StorageConfigurationError

logger = logging.getLogger(__name__)


[docs] class DataManager: """ Unified data management interface for agents. Provides high-level methods for storing, retrieving, and querying data across multiple storage backends through a single interface. Features: - Automatic provider selection based on data type - Cross-provider queries and operations - Data reference management for spore communication - Storage optimization and caching - Transaction-like operations across providers """
[docs] def __init__(self, registry: Optional[StorageRegistry] = None): """ Initialize data manager. Args: registry: Storage registry to use (defaults to global registry) """ self.registry = registry or get_storage_registry() self._agent_context = threading.local()
[docs] def set_agent_context(self, agent_name: str): """Set the current agent context for permission checking.""" self._agent_context.agent_name = agent_name
[docs] def get_agent_context(self) -> Optional[str]: """Get the current agent context.""" return getattr(self._agent_context, 'agent_name', None)
# High-level storage operations
[docs] async def store(self, provider: str, resource: str, data: Any, **kwargs) -> StorageResult: """ Store data in a specific provider. Args: provider: Storage provider name resource: Resource identifier (table, key, path, etc.) data: Data to store **kwargs: Provider-specific parameters Returns: StorageResult with operation outcome """ storage_provider = self._get_provider(provider) return await storage_provider.store(resource, data, **kwargs)
[docs] async def get(self, provider: str, resource: str, **kwargs) -> StorageResult: """ Retrieve data from a specific provider. Args: provider: Storage provider name resource: Resource identifier **kwargs: Provider-specific parameters Returns: StorageResult with retrieved data """ storage_provider = self._get_provider(provider) return await storage_provider.retrieve(resource, **kwargs)
[docs] async def query(self, provider: str, resource: str, query: Union[str, Dict], **kwargs) -> StorageResult: """ Execute a query on a specific provider. Args: provider: Storage provider name resource: Resource to query query: Query string or structured query **kwargs: Query parameters Returns: StorageResult with query results """ storage_provider = self._get_provider(provider) return await storage_provider.query(resource, query, **kwargs)
[docs] async def delete(self, provider: str, resource: str, **kwargs) -> StorageResult: """ Delete data from a specific provider. Args: provider: Storage provider name resource: Resource to delete **kwargs: Delete parameters Returns: StorageResult with operation outcome """ storage_provider = self._get_provider(provider) return await storage_provider.delete(resource, **kwargs)
# Smart storage operations
[docs] async def smart_store(self, data: Any, resource: Optional[str] = None, preferred_provider: Optional[str] = None, **kwargs) -> StorageResult: """ Intelligently store data by selecting the best provider. Args: data: Data to store resource: Optional resource identifier preferred_provider: Preferred storage provider **kwargs: Storage parameters Returns: StorageResult with operation outcome """ if preferred_provider: provider_name = preferred_provider else: provider_name = self._select_optimal_provider(data, "store") if not resource: resource = self._generate_resource_id(data, provider_name) return await self.store(provider_name, resource, data, **kwargs)
# Data reference operations
[docs] def create_data_reference(self, provider: str, resource: str, **metadata) -> DataReference: """ Create a data reference for spore communication. Args: provider: Storage provider name resource: Resource identifier **metadata: Additional metadata Returns: DataReference object """ storage_provider = self._get_provider(provider) return DataReference( provider=provider, storage_type=storage_provider.metadata.storage_type, resource_id=resource, metadata=metadata )
[docs] async def resolve_data_reference(self, data_ref: Union[DataReference, str], **kwargs) -> StorageResult: """ Resolve a data reference to actual data. Args: data_ref: DataReference object or URI string **kwargs: Retrieval parameters Returns: StorageResult with resolved data """ if isinstance(data_ref, str): data_ref = DataReference.from_uri(data_ref) # Check if reference has expired if data_ref.is_expired(): return StorageResult( success=False, error="Data reference has expired" ) return await self.get(data_ref.provider, data_ref.resource_id, **kwargs)
# Batch operations
[docs] async def batch_store(self, operations: List[Dict[str, Any]]) -> List[StorageResult]: """ Execute multiple store operations in batch. Args: operations: List of operation dictionaries with keys: - provider: Storage provider name - resource: Resource identifier - data: Data to store - kwargs: Additional parameters Returns: List of StorageResult objects """ results = [] for op in operations: try: result = await self.store( op["provider"], op["resource"], op["data"], **op.get("kwargs", {}) ) results.append(result) except Exception as e: results.append(StorageResult( success=False, error=f"Batch operation failed: {str(e)}" )) return results
[docs] async def batch_get(self, operations: List[Dict[str, Any]]) -> List[StorageResult]: """ Execute multiple get operations in batch. Args: operations: List of operation dictionaries with keys: - provider: Storage provider name - resource: Resource identifier - kwargs: Additional parameters Returns: List of StorageResult objects """ results = [] for op in operations: try: result = await self.get( op["provider"], op["resource"], **op.get("kwargs", {}) ) results.append(result) except Exception as e: results.append(StorageResult( success=False, error=f"Batch operation failed: {str(e)}" )) return results
# Provider management
[docs] def list_providers(self, storage_type: Optional[str] = None) -> List[str]: """ List available storage providers. Args: storage_type: Filter by storage type Returns: List of provider names """ from .base_provider import StorageType storage_type_enum = None if storage_type: try: storage_type_enum = StorageType(storage_type) except ValueError: logger.warning(f"Unknown storage type: {storage_type}") return self.registry.list_providers( storage_type=storage_type_enum, agent_name=self.get_agent_context() )
[docs] def get_provider_info(self, provider: str) -> Dict[str, Any]: """ Get information about a storage provider. Args: provider: Storage provider name Returns: Provider information dictionary """ storage_provider = self._get_provider(provider) return storage_provider.get_schema()
[docs] async def health_check(self, provider: Optional[str] = None) -> Dict[str, Any]: """ Perform health check on providers. Args: provider: Specific provider to check (None for all) Returns: Health check results """ if provider: storage_provider = self._get_provider(provider) return await storage_provider.health_check() else: return await self.registry.health_check_all()
# Private helper methods def _get_provider(self, provider_name: str) -> BaseStorageProvider: """Get storage provider with permission checking.""" return self.registry.get_provider(provider_name, self.get_agent_context()) def _select_optimal_provider(self, data: Any, operation: str) -> str: """Select the best storage provider for given data and operation.""" from .base_provider import StorageType # Simple heuristics for provider selection if isinstance(data, dict) and "vector" in data: # Vector data - prefer vector databases vector_providers = self.registry.get_providers_by_type(StorageType.VECTOR) if vector_providers: return vector_providers[0] if isinstance(data, (dict, list)) and len(str(data)) > 1024: # Large structured data - prefer object storage object_providers = self.registry.get_providers_by_type(StorageType.OBJECT) if object_providers: return object_providers[0] if isinstance(data, dict) and any(key in data for key in ["id", "name", "email"]): # Structured record - prefer relational database relational_providers = self.registry.get_providers_by_type(StorageType.RELATIONAL) if relational_providers: return relational_providers[0] if operation in ["get", "set"] and isinstance(data, (str, int, float)): # Simple key-value - prefer cache/key-value store kv_providers = self.registry.get_providers_by_type(StorageType.KEY_VALUE) if kv_providers: return kv_providers[0] # Default to first available provider available_providers = self.list_providers() if available_providers: return available_providers[0] raise StorageConfigurationError("", "No storage providers available") def _select_search_providers(self, query: Union[str, List[float], Dict]) -> List[str]: """Select providers suitable for search based on query type.""" from .base_provider import StorageType suitable_providers = [] if isinstance(query, list) and all(isinstance(x, (int, float)) for x in query): # Vector query - use vector databases suitable_providers.extend(self.registry.get_providers_by_type(StorageType.VECTOR)) if isinstance(query, str): # Text query - use search engines, full-text capable databases suitable_providers.extend(self.registry.get_providers_by_type(StorageType.SEARCH)) suitable_providers.extend(self.registry.get_providers_by_type(StorageType.RELATIONAL)) if isinstance(query, dict): # Structured query - use databases suitable_providers.extend(self.registry.get_providers_by_type(StorageType.RELATIONAL)) suitable_providers.extend(self.registry.get_providers_by_type(StorageType.DOCUMENT)) # Remove duplicates while preserving order return list(dict.fromkeys(suitable_providers)) def _generate_resource_id(self, data: Any, provider_name: str) -> str: """Generate appropriate resource ID based on data and provider.""" import hashlib import time # Simple resource ID generation if hasattr(data, 'get') and 'id' in data: return str(data['id']) # Generate based on content hash and timestamp content_str = str(data)[:1000] # Limit to avoid huge strings hash_obj = hashlib.md5(content_str.encode()) timestamp = int(time.time()) return f"{provider_name}_{timestamp}_{hash_obj.hexdigest()[:8]}" async def _execute_search(self, provider_name: str, query: Any, **kwargs) -> StorageResult: """Execute search on a specific provider.""" storage_provider = self._get_provider(provider_name) if isinstance(query, list) and all(isinstance(x, (int, float)) for x in query): # Vector search return await storage_provider.query("", "search", vector=query, **kwargs) elif isinstance(query, str): # Text search return await storage_provider.query("", query, **kwargs) else: # Structured query return await storage_provider.query("", query, **kwargs)
# Global data manager instance _global_data_manager: Optional[DataManager] = None _data_manager_lock = threading.Lock()
[docs] def get_data_manager() -> DataManager: """Get the global data manager instance.""" global _global_data_manager if _global_data_manager is None: with _data_manager_lock: if _global_data_manager is None: _global_data_manager = DataManager() return _global_data_manager
# Convenience functions for common operations
[docs] async def store_data(provider: str, resource: str, data: Any, **kwargs) -> StorageResult: """Store data using the global data manager.""" return await get_data_manager().store(provider, resource, data, **kwargs)
[docs] async def get_data(provider: str, resource: str, **kwargs) -> StorageResult: """Retrieve data using the global data manager.""" return await get_data_manager().get(provider, resource, **kwargs)
[docs] async def query_data(provider: str, resource: str, query: Union[str, Dict], **kwargs) -> StorageResult: """Query data using the global data manager.""" return await get_data_manager().query(provider, resource, query, **kwargs)
[docs] async def delete_data(provider: str, resource: str, **kwargs) -> StorageResult: """Delete data using the global data manager.""" return await get_data_manager().delete(provider, resource, **kwargs)