"""
S3 object storage provider for Praval framework
Provides object storage capabilities with S3-compatible backends
including AWS S3, MinIO, and other S3-compatible services.
"""
import json
import logging
from typing import Any, Dict, List, Optional, Union, BinaryIO
from datetime import datetime, timedelta
from urllib.parse import urlparse
import io
try:
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
BOTO3_AVAILABLE = True
except ImportError:
BOTO3_AVAILABLE = False
boto3 = None
ClientError = Exception
NoCredentialsError = Exception
from ..base_provider import BaseStorageProvider, StorageMetadata, StorageResult, StorageType, DataReference
from ..exceptions import StorageConnectionError, StorageConfigurationError
logger = logging.getLogger(__name__)
[docs]
class S3Provider(BaseStorageProvider):
"""
S3-compatible object storage provider.
Features:
- Object upload, download, and deletion
- Bucket management
- Presigned URLs for secure access
- Metadata and tagging support
- Multipart uploads for large files
- Lifecycle management
- Cross-region replication support
"""
def _create_metadata(self) -> StorageMetadata:
return StorageMetadata(
name=self.name,
description="S3-compatible object storage provider",
storage_type=StorageType.OBJECT,
supports_async=False, # boto3 doesn't support async natively
supports_transactions=False,
supports_schemas=False,
supports_indexing=False,
supports_search=False,
supports_streaming=True,
default_timeout=60.0,
required_config=["bucket_name"],
optional_config=[
"aws_access_key_id", "aws_secret_access_key", "region_name",
"endpoint_url", "use_ssl", "signature_version"
],
connection_string_template="s3://{bucket_name}"
)
def _initialize(self):
"""Initialize S3-specific settings."""
if not BOTO3_AVAILABLE:
raise ImportError("boto3 is required for S3 provider. Install with: pip install boto3")
# Set default values
self.config.setdefault("region_name", "us-east-1")
self.config.setdefault("use_ssl", True)
self.config.setdefault("signature_version", "s3v4")
self.s3_client = None
self.bucket_name = self.config["bucket_name"]
self._client_kwargs = self._build_client_kwargs()
def _build_client_kwargs(self) -> Dict[str, Any]:
"""Build S3 client parameters from config."""
kwargs = {
"service_name": "s3",
"region_name": self.config["region_name"],
"use_ssl": self.config["use_ssl"],
"config": boto3.session.Config(
signature_version=self.config["signature_version"]
)
}
# Add credentials if provided
if "aws_access_key_id" in self.config:
kwargs["aws_access_key_id"] = self.config["aws_access_key_id"]
if "aws_secret_access_key" in self.config:
kwargs["aws_secret_access_key"] = self.config["aws_secret_access_key"]
# Add endpoint URL for S3-compatible services (like MinIO)
if "endpoint_url" in self.config:
kwargs["endpoint_url"] = self.config["endpoint_url"]
return kwargs
[docs]
async def connect(self) -> bool:
"""Establish connection to S3."""
try:
self.s3_client = boto3.client(**self._client_kwargs)
# Test connection by checking if bucket exists
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
# Bucket doesn't exist, try to create it if allowed
if self.config.get("create_bucket", False):
self.s3_client.create_bucket(Bucket=self.bucket_name)
logger.info(f"Created S3 bucket: {self.bucket_name}")
else:
raise StorageConnectionError(
self.name,
f"Bucket '{self.bucket_name}' does not exist and create_bucket is disabled"
)
else:
raise
self.is_connected = True
logger.info(f"Connected to S3 bucket: {self.bucket_name}")
return True
except NoCredentialsError:
logger.error("AWS credentials not found")
raise StorageConnectionError(self.name, "AWS credentials not found")
except Exception as e:
logger.error(f"Failed to connect to S3: {e}")
raise StorageConnectionError(self.name, str(e))
[docs]
async def disconnect(self):
"""Close S3 connection."""
if self.s3_client:
# boto3 client doesn't need explicit closing
self.s3_client = None
self.is_connected = False
logger.info(f"Disconnected from S3: {self.name}")
[docs]
async def store(self, resource: str, data: Any, **kwargs) -> StorageResult:
"""
Store object in S3.
Args:
resource: S3 object key
data: Data to store (bytes, string, file-like object, or dict/list for JSON)
**kwargs: S3 parameters (content_type, metadata, acl, etc.)
Returns:
StorageResult with operation outcome
"""
if not self.is_connected:
await self.connect()
try:
# Prepare data for upload
if isinstance(data, (dict, list)):
# JSON data
body = json.dumps(data).encode('utf-8')
content_type = kwargs.get("content_type", "application/json")
elif isinstance(data, str):
# String data
body = data.encode('utf-8')
content_type = kwargs.get("content_type", "text/plain")
elif isinstance(data, bytes):
# Binary data
body = data
content_type = kwargs.get("content_type", "application/octet-stream")
elif hasattr(data, 'read'):
# File-like object
body = data
content_type = kwargs.get("content_type", "application/octet-stream")
else:
# Convert to string
body = str(data).encode('utf-8')
content_type = kwargs.get("content_type", "text/plain")
# Prepare S3 parameters
put_kwargs = {
"Bucket": self.bucket_name,
"Key": resource,
"Body": body,
"ContentType": content_type
}
# Add optional parameters
if "metadata" in kwargs:
put_kwargs["Metadata"] = kwargs["metadata"]
if "acl" in kwargs:
put_kwargs["ACL"] = kwargs["acl"]
if "server_side_encryption" in kwargs:
put_kwargs["ServerSideEncryption"] = kwargs["server_side_encryption"]
if "cache_control" in kwargs:
put_kwargs["CacheControl"] = kwargs["cache_control"]
# Upload object
response = self.s3_client.put_object(**put_kwargs)
# Get object info for metadata
head_response = self.s3_client.head_object(Bucket=self.bucket_name, Key=resource)
return StorageResult(
success=True,
data={
"bucket": self.bucket_name,
"key": resource,
"etag": response["ETag"],
"size": head_response["ContentLength"]
},
metadata={
"operation": "put_object",
"content_type": content_type,
"last_modified": head_response["LastModified"].isoformat(),
"etag": response["ETag"]
},
data_reference=DataReference(
provider=self.name,
storage_type=StorageType.OBJECT,
resource_id=resource,
metadata={
"bucket": self.bucket_name,
"size": head_response["ContentLength"],
"content_type": content_type
}
)
)
except Exception as e:
logger.error(f"Store operation failed: {e}")
return StorageResult(
success=False,
error=f"Store operation failed: {str(e)}"
)
[docs]
async def retrieve(self, resource: str, **kwargs) -> StorageResult:
"""
Retrieve object from S3.
Args:
resource: S3 object key
**kwargs: Retrieval parameters (range, decode_json, etc.)
Returns:
StorageResult with retrieved data
"""
if not self.is_connected:
await self.connect()
try:
get_kwargs = {
"Bucket": self.bucket_name,
"Key": resource
}
# Add range if specified
if "range" in kwargs:
get_kwargs["Range"] = kwargs["range"]
# Get object
response = self.s3_client.get_object(**get_kwargs)
# Read data
body = response["Body"].read()
# Decode based on content type or request
content_type = response.get("ContentType", "")
decode_json = kwargs.get("decode_json", content_type == "application/json")
if decode_json and content_type == "application/json":
try:
data = json.loads(body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
data = body
elif kwargs.get("decode_text", content_type.startswith("text/")):
try:
data = body.decode('utf-8')
except UnicodeDecodeError:
data = body
else:
data = body
return StorageResult(
success=True,
data=data,
metadata={
"operation": "get_object",
"key": resource,
"content_type": content_type,
"size": response["ContentLength"],
"last_modified": response["LastModified"].isoformat(),
"etag": response["ETag"],
"metadata": response.get("Metadata", {})
}
)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchKey':
return StorageResult(
success=False,
error=f"Object '{resource}' not found"
)
else:
logger.error(f"Retrieve operation failed: {e}")
return StorageResult(
success=False,
error=f"Retrieve operation failed: {str(e)}"
)
except Exception as e:
logger.error(f"Retrieve operation failed: {e}")
return StorageResult(
success=False,
error=f"Retrieve operation failed: {str(e)}"
)
[docs]
async def query(self, resource: str, query: Union[str, Dict], **kwargs) -> StorageResult:
"""
Execute S3 operations or list objects.
Args:
resource: Prefix or specific key
query: Query type ("list", "search", "metadata", etc.)
**kwargs: Query parameters
Returns:
StorageResult with query results
"""
if not self.is_connected:
await self.connect()
try:
if isinstance(query, str):
if query == "list":
# List objects with prefix
list_kwargs = {
"Bucket": self.bucket_name,
"Prefix": resource
}
if "max_keys" in kwargs:
list_kwargs["MaxKeys"] = kwargs["max_keys"]
if "continuation_token" in kwargs:
list_kwargs["ContinuationToken"] = kwargs["continuation_token"]
response = self.s3_client.list_objects_v2(**list_kwargs)
objects = []
if "Contents" in response:
for obj in response["Contents"]:
objects.append({
"key": obj["Key"],
"size": obj["Size"],
"last_modified": obj["LastModified"].isoformat(),
"etag": obj["ETag"]
})
result_data = {
"objects": objects,
"count": len(objects),
"is_truncated": response.get("IsTruncated", False)
}
if "NextContinuationToken" in response:
result_data["next_token"] = response["NextContinuationToken"]
return StorageResult(
success=True,
data=result_data,
metadata={"operation": "list_objects", "prefix": resource}
)
elif query == "metadata":
# Get object metadata
response = self.s3_client.head_object(Bucket=self.bucket_name, Key=resource)
return StorageResult(
success=True,
data={
"key": resource,
"size": response["ContentLength"],
"last_modified": response["LastModified"].isoformat(),
"etag": response["ETag"],
"content_type": response.get("ContentType"),
"metadata": response.get("Metadata", {})
},
metadata={"operation": "head_object"}
)
elif query == "exists":
# Check if object exists
try:
self.s3_client.head_object(Bucket=self.bucket_name, Key=resource)
return StorageResult(
success=True,
data={"exists": True},
metadata={"operation": "object_exists"}
)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return StorageResult(
success=True,
data={"exists": False},
metadata={"operation": "object_exists"}
)
else:
raise
elif query == "presigned_url":
# Generate presigned URL
expiration = kwargs.get("expiration", 3600) # 1 hour default
http_method = kwargs.get("method", "GET")
url = self.s3_client.generate_presigned_url(
f"{http_method.lower()}_object",
Params={"Bucket": self.bucket_name, "Key": resource},
ExpiresIn=expiration
)
return StorageResult(
success=True,
data={"url": url, "expires_in": expiration},
metadata={"operation": "presigned_url", "method": http_method}
)
else:
raise ValueError(f"Unsupported query type: {query}")
else:
raise ValueError(f"Unsupported query format: {type(query)}")
except Exception as e:
logger.error(f"Query operation failed: {e}")
return StorageResult(
success=False,
error=f"Query operation failed: {str(e)}"
)
[docs]
async def delete(self, resource: str, **kwargs) -> StorageResult:
"""
Delete object(s) from S3.
Args:
resource: Object key or prefix
**kwargs: Delete parameters (recursive, etc.)
Returns:
StorageResult with operation outcome
"""
if not self.is_connected:
await self.connect()
try:
if kwargs.get("recursive", False):
# Delete all objects with prefix
list_response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=resource
)
if "Contents" not in list_response:
return StorageResult(
success=True,
data={"deleted": 0},
metadata={"operation": "delete_objects", "prefix": resource}
)
# Prepare objects for deletion
objects_to_delete = [{"Key": obj["Key"]} for obj in list_response["Contents"]]
# Delete objects in batches
deleted_count = 0
batch_size = kwargs.get("batch_size", 1000)
for i in range(0, len(objects_to_delete), batch_size):
batch = objects_to_delete[i:i + batch_size]
delete_response = self.s3_client.delete_objects(
Bucket=self.bucket_name,
Delete={"Objects": batch}
)
deleted_count += len(delete_response.get("Deleted", []))
return StorageResult(
success=True,
data={"deleted": deleted_count},
metadata={"operation": "delete_objects", "prefix": resource}
)
else:
# Delete single object
self.s3_client.delete_object(Bucket=self.bucket_name, Key=resource)
return StorageResult(
success=True,
data={"deleted": 1},
metadata={"operation": "delete_object", "key": resource}
)
except Exception as e:
logger.error(f"Delete operation failed: {e}")
return StorageResult(
success=False,
error=f"Delete operation failed: {str(e)}"
)
[docs]
async def list_resources(self, prefix: str = "", **kwargs) -> StorageResult:
"""List objects in S3 bucket."""
return await self.query(prefix, "list", **kwargs)