Source code for praval.storage.providers.filesystem

"""
File system storage provider for Praval framework

Provides local file system storage capabilities with path management,
directory operations, and file metadata support.
"""

import json
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from datetime import datetime

from ..base_provider import BaseStorageProvider, StorageMetadata, StorageResult, StorageType, DataReference
from ..exceptions import StorageConnectionError, StorageConfigurationError

logger = logging.getLogger(__name__)


[docs] class FileSystemProvider(BaseStorageProvider): """ Local file system storage provider. Features: - File and directory operations - Path management and validation - File metadata and permissions - Recursive operations - Pattern-based file listing - Atomic file operations """ def _create_metadata(self) -> StorageMetadata: return StorageMetadata( name=self.name, description="Local file system storage provider", storage_type=StorageType.FILE_SYSTEM, supports_async=True, supports_transactions=False, supports_schemas=False, supports_indexing=False, supports_search=True, # Basic pattern matching supports_streaming=True, default_timeout=30.0, required_config=["base_path"], optional_config=["create_directories", "permissions", "max_file_size"], connection_string_template="file://{base_path}" ) def _initialize(self): """Initialize file system-specific settings.""" self.base_path = Path(self.config["base_path"]).resolve() self.config.setdefault("create_directories", True) self.config.setdefault("permissions", 0o644) self.config.setdefault("max_file_size", 100 * 1024 * 1024) # 100MB default # Validate base path if not self.base_path.exists(): if self.config["create_directories"]: self.base_path.mkdir(parents=True, exist_ok=True) logger.info(f"Created base directory: {self.base_path}") else: raise StorageConfigurationError( self.name, f"Base path does not exist: {self.base_path}" ) if not self.base_path.is_dir(): raise StorageConfigurationError( self.name, f"Base path is not a directory: {self.base_path}" )
[docs] async def connect(self) -> bool: """Verify file system access.""" try: # Test write access test_file = self.base_path / ".praval_test" test_file.write_text("test") test_file.unlink() self.is_connected = True logger.info(f"Connected to file system: {self.base_path}") return True except Exception as e: logger.error(f"Failed to access file system: {e}") raise StorageConnectionError(self.name, str(e))
[docs] async def disconnect(self): """No explicit disconnection needed for file system.""" self.is_connected = False logger.info(f"Disconnected from file system: {self.name}")
def _resolve_path(self, resource: str) -> Path: """Resolve resource path relative to base path.""" # Normalize path separators resource = resource.replace('\\', '/') # Remove leading slash if resource.startswith('/'): resource = resource[1:] resolved = (self.base_path / resource).resolve() # Security check: ensure path is within base directory try: resolved.relative_to(self.base_path) except ValueError: raise ValueError(f"Path '{resource}' is outside base directory") return resolved
[docs] async def store(self, resource: str, data: Any, **kwargs) -> StorageResult: """ Store data to file system. Args: resource: File path relative to base_path data: Data to store (string, bytes, dict/list for JSON, or file-like object) **kwargs: File parameters (encoding, mode, etc.) Returns: StorageResult with operation outcome """ if not self.is_connected: await self.connect() try: file_path = self._resolve_path(resource) # Create parent directories if needed if self.config["create_directories"]: file_path.parent.mkdir(parents=True, exist_ok=True) # Determine how to write the data if isinstance(data, (dict, list)): # JSON data content = json.dumps(data, indent=2) encoding = kwargs.get("encoding", "utf-8") file_path.write_text(content, encoding=encoding) content_type = "application/json" elif isinstance(data, str): # Text data encoding = kwargs.get("encoding", "utf-8") file_path.write_text(data, encoding=encoding) content_type = "text/plain" elif isinstance(data, bytes): # Binary data file_path.write_bytes(data) content_type = "application/octet-stream" elif hasattr(data, 'read'): # File-like object content_type = kwargs.get("content_type", "application/octet-stream") if hasattr(data, 'mode') and 'b' in data.mode: # Binary file with open(file_path, 'wb') as f: shutil.copyfileobj(data, f) else: # Text file encoding = kwargs.get("encoding", "utf-8") with open(file_path, 'w', encoding=encoding) as f: shutil.copyfileobj(data, f) else: # Convert to string content = str(data) encoding = kwargs.get("encoding", "utf-8") file_path.write_text(content, encoding=encoding) content_type = "text/plain" # Set file permissions if specified if "permissions" in kwargs: file_path.chmod(kwargs["permissions"]) # Get file stats stat = file_path.stat() return StorageResult( success=True, data={ "path": str(file_path.relative_to(self.base_path)), "size": stat.st_size, "created": True }, metadata={ "operation": "write_file", "content_type": content_type, "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "permissions": oct(stat.st_mode)[-3:] }, data_reference=DataReference( provider=self.name, storage_type=StorageType.FILE_SYSTEM, resource_id=resource, metadata={ "size": stat.st_size, "content_type": content_type, "full_path": str(file_path) } ) ) except Exception as e: logger.error(f"Store operation failed: {e}") return StorageResult( success=False, error=f"Store operation failed: {str(e)}" )
[docs] async def retrieve(self, resource: str, **kwargs) -> StorageResult: """ Retrieve data from file system. Args: resource: File path relative to base_path **kwargs: Read parameters (encoding, decode_json, etc.) Returns: StorageResult with retrieved data """ if not self.is_connected: await self.connect() try: file_path = self._resolve_path(resource) if not file_path.exists(): return StorageResult( success=False, error=f"File not found: {resource}" ) if not file_path.is_file(): return StorageResult( success=False, error=f"Path is not a file: {resource}" ) # Determine content type from extension suffix = file_path.suffix.lower() if suffix in ['.json']: content_type = "application/json" elif suffix in ['.txt', '.md', '.csv']: content_type = "text/plain" elif suffix in ['.jpg', '.jpeg', '.png', '.gif']: content_type = f"image/{suffix[1:]}" else: content_type = "application/octet-stream" # Read data based on content type and parameters decode_json = kwargs.get("decode_json", content_type == "application/json") binary_mode = kwargs.get("binary", content_type.startswith("image/") or content_type == "application/octet-stream") if binary_mode: data = file_path.read_bytes() else: encoding = kwargs.get("encoding", "utf-8") data = file_path.read_text(encoding=encoding) if decode_json and (suffix == '.json' or content_type == "application/json"): try: data = json.loads(data) except json.JSONDecodeError: pass # Keep as string if JSON parsing fails # Get file stats stat = file_path.stat() return StorageResult( success=True, data=data, metadata={ "operation": "read_file", "path": resource, "content_type": content_type, "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "permissions": oct(stat.st_mode)[-3:] } ) except Exception as e: logger.error(f"Retrieve operation failed: {e}") return StorageResult( success=False, error=f"Retrieve operation failed: {str(e)}" )
[docs] async def query(self, resource: str, query: Union[str, Dict], **kwargs) -> StorageResult: """ Execute file system operations. Args: resource: Path or pattern query: Query type ("list", "find", "metadata", etc.) **kwargs: Query parameters Returns: StorageResult with query results """ if not self.is_connected: await self.connect() try: if isinstance(query, str): if query == "list": # List directory contents dir_path = self._resolve_path(resource) if not dir_path.exists(): return StorageResult( success=False, error=f"Directory not found: {resource}" ) if not dir_path.is_dir(): return StorageResult( success=False, error=f"Path is not a directory: {resource}" ) items = [] for item in dir_path.iterdir(): stat = item.stat() items.append({ "name": item.name, "path": str(item.relative_to(self.base_path)), "type": "directory" if item.is_dir() else "file", "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "permissions": oct(stat.st_mode)[-3:] }) return StorageResult( success=True, data=items, metadata={"operation": "list_directory", "count": len(items)} ) elif query == "find": # Find files matching pattern pattern = kwargs.get("pattern", "*") recursive = kwargs.get("recursive", False) base_dir = self._resolve_path(resource) if resource else self.base_path if recursive: matches = list(base_dir.rglob(pattern)) else: matches = list(base_dir.glob(pattern)) items = [] for match in matches: if match.is_file(): stat = match.stat() items.append({ "name": match.name, "path": str(match.relative_to(self.base_path)), "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "permissions": oct(stat.st_mode)[-3:] }) return StorageResult( success=True, data=items, metadata={"operation": "find_files", "pattern": pattern, "count": len(items)} ) elif query == "metadata": # Get file/directory metadata path = self._resolve_path(resource) if not path.exists(): return StorageResult( success=False, error=f"Path not found: {resource}" ) stat = path.stat() return StorageResult( success=True, data={ "path": resource, "type": "directory" if path.is_dir() else "file", "size": stat.st_size, "created": datetime.fromtimestamp(stat.st_ctime).isoformat(), "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "accessed": datetime.fromtimestamp(stat.st_atime).isoformat(), "permissions": oct(stat.st_mode)[-3:], "owner": stat.st_uid, "group": stat.st_gid }, metadata={"operation": "file_metadata"} ) elif query == "exists": # Check if path exists path = self._resolve_path(resource) return StorageResult( success=True, data={"exists": path.exists()}, metadata={"operation": "path_exists"} ) else: raise ValueError(f"Unsupported query type: {query}") else: raise ValueError(f"Unsupported query format: {type(query)}") except Exception as e: logger.error(f"Query operation failed: {e}") return StorageResult( success=False, error=f"Query operation failed: {str(e)}" )
[docs] async def delete(self, resource: str, **kwargs) -> StorageResult: """ Delete file or directory from file system. Args: resource: Path to delete **kwargs: Delete parameters (recursive, etc.) Returns: StorageResult with operation outcome """ if not self.is_connected: await self.connect() try: path = self._resolve_path(resource) if not path.exists(): return StorageResult( success=False, error=f"Path not found: {resource}" ) if path.is_file(): path.unlink() deleted_count = 1 operation = "delete_file" elif path.is_dir(): recursive = kwargs.get("recursive", False) if recursive: shutil.rmtree(path) deleted_count = 1 # Count as one directory operation = "delete_directory_recursive" else: # Only delete if empty try: path.rmdir() deleted_count = 1 operation = "delete_directory" except OSError as e: return StorageResult( success=False, error=f"Directory not empty (use recursive=True): {e}" ) else: return StorageResult( success=False, error=f"Unknown path type: {resource}" ) return StorageResult( success=True, data={"deleted": deleted_count}, metadata={"operation": operation, "path": resource} ) except Exception as e: logger.error(f"Delete operation failed: {e}") return StorageResult( success=False, error=f"Delete operation failed: {str(e)}" )
[docs] async def list_resources(self, prefix: str = "", **kwargs) -> StorageResult: """List files and directories.""" return await self.query(prefix or "", "list", **kwargs)