"""
PostgreSQL storage provider for Praval framework
Provides relational database capabilities with SQL query support,
transactions, and schema management.
"""
import json
import logging
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
try:
import asyncpg
import asyncpg.pool
ASYNCPG_AVAILABLE = True
except ImportError:
ASYNCPG_AVAILABLE = False
asyncpg = None
from ..base_provider import BaseStorageProvider, StorageMetadata, StorageResult, StorageType, DataReference
from ..exceptions import StorageConnectionError, StorageConfigurationError
logger = logging.getLogger(__name__)
[docs]
class PostgreSQLProvider(BaseStorageProvider):
"""
PostgreSQL storage provider with async connection pooling.
Features:
- Async connection pooling
- SQL query execution
- Transaction support
- Schema management
- JSON column support
- Full-text search capabilities
"""
def _create_metadata(self) -> StorageMetadata:
return StorageMetadata(
name=self.name,
description="PostgreSQL relational database provider",
storage_type=StorageType.RELATIONAL,
supports_async=True,
supports_transactions=True,
supports_schemas=True,
supports_indexing=True,
supports_search=True,
max_connection_pool=20,
default_timeout=30.0,
required_config=["host", "database", "user", "password"],
optional_config=["port", "ssl", "pool_min_size", "pool_max_size"],
connection_string_template="postgresql://{user}:{password}@{host}:{port}/{database}"
)
def _initialize(self):
"""Initialize PostgreSQL-specific settings."""
if not ASYNCPG_AVAILABLE:
raise ImportError("asyncpg is required for PostgreSQL provider. Install with: pip install asyncpg")
# Set default values
self.config.setdefault("port", 5432)
self.config.setdefault("pool_min_size", 1)
self.config.setdefault("pool_max_size", 10)
self.config.setdefault("ssl", False)
self.connection_pool: Optional[asyncpg.pool.Pool] = None
self._connection_string = self._build_connection_string()
def _build_connection_string(self) -> str:
"""Build PostgreSQL connection string from config."""
return (
f"postgresql://{self.config['user']}:{self.config['password']}"
f"@{self.config['host']}:{self.config['port']}/{self.config['database']}"
)
[docs]
async def connect(self) -> bool:
"""Establish connection pool to PostgreSQL."""
try:
self.connection_pool = await asyncpg.create_pool(
self._connection_string,
min_size=self.config["pool_min_size"],
max_size=self.config["pool_max_size"],
command_timeout=self.metadata.default_timeout
)
# Test connection
async with self.connection_pool.acquire() as conn:
await conn.execute("SELECT 1")
self.is_connected = True
logger.info(f"Connected to PostgreSQL: {self.config['host']}:{self.config['port']}/{self.config['database']}")
return True
except Exception as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
raise StorageConnectionError(self.name, str(e))
[docs]
async def disconnect(self):
"""Close connection pool."""
if self.connection_pool:
await self.connection_pool.close()
self.connection_pool = None
self.is_connected = False
logger.info(f"Disconnected from PostgreSQL: {self.name}")
[docs]
async def store(self, resource: str, data: Any, **kwargs) -> StorageResult:
"""
Store data in PostgreSQL table.
Args:
resource: Table name
data: Data to store (dict or list of dicts)
**kwargs: Additional parameters (upsert, returning, etc.)
Returns:
StorageResult with operation outcome
"""
if not self.is_connected:
await self.connect()
try:
async with self.connection_pool.acquire() as conn:
if isinstance(data, dict):
# Single record insert
columns = list(data.keys())
values = list(data.values())
placeholders = [f"${i+1}" for i in range(len(values))]
query = f"INSERT INTO {resource} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
if kwargs.get("returning"):
query += f" RETURNING {kwargs['returning']}"
if kwargs.get("returning"):
result = await conn.fetchrow(query, *values)
return StorageResult(
success=True,
data=dict(result) if result else None,
metadata={"operation": "insert", "table": resource}
)
else:
await conn.execute(query, *values)
return StorageResult(
success=True,
data={"inserted": 1},
metadata={"operation": "insert", "table": resource}
)
elif isinstance(data, list):
# Bulk insert
if not data:
return StorageResult(success=True, data={"inserted": 0})
columns = list(data[0].keys())
placeholders = [f"${i+1}" for i in range(len(columns))]
query = f"INSERT INTO {resource} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
values_list = [[row[col] for col in columns] for row in data]
await conn.executemany(query, values_list)
return StorageResult(
success=True,
data={"inserted": len(data)},
metadata={"operation": "bulk_insert", "table": resource}
)
else:
raise ValueError(f"Unsupported data type for storage: {type(data)}")
except Exception as e:
logger.error(f"Store operation failed: {e}")
return StorageResult(
success=False,
error=f"Store operation failed: {str(e)}"
)
[docs]
async def retrieve(self, resource: str, **kwargs) -> StorageResult:
"""
Retrieve data from PostgreSQL table.
Args:
resource: Table name
**kwargs: Query parameters (where, limit, offset, order_by, etc.)
Returns:
StorageResult with retrieved data
"""
if not self.is_connected:
await self.connect()
try:
async with self.connection_pool.acquire() as conn:
query = f"SELECT * FROM {resource}"
params = []
# Build WHERE clause
if "where" in kwargs:
where_clause, where_params = self._build_where_clause(kwargs["where"])
query += f" WHERE {where_clause}"
params.extend(where_params)
# Add ORDER BY
if "order_by" in kwargs:
query += f" ORDER BY {kwargs['order_by']}"
# Add LIMIT and OFFSET
if "limit" in kwargs:
query += f" LIMIT {kwargs['limit']}"
if "offset" in kwargs:
query += f" OFFSET {kwargs['offset']}"
rows = await conn.fetch(query, *params)
data = [dict(row) for row in rows]
return StorageResult(
success=True,
data=data,
metadata={
"operation": "select",
"table": resource,
"count": len(data)
}
)
except Exception as e:
logger.error(f"Retrieve operation failed: {e}")
return StorageResult(
success=False,
error=f"Retrieve operation failed: {str(e)}"
)
[docs]
async def query(self, resource: str, query: Union[str, Dict], **kwargs) -> StorageResult:
"""
Execute SQL query against PostgreSQL.
Args:
resource: Table name (ignored for raw SQL)
query: SQL query string or structured query dict
**kwargs: Query parameters
Returns:
StorageResult with query results
"""
if not self.is_connected:
await self.connect()
try:
async with self.connection_pool.acquire() as conn:
if isinstance(query, str):
# Raw SQL query
params = kwargs.get("params", [])
if query.strip().upper().startswith("SELECT"):
rows = await conn.fetch(query, *params)
data = [dict(row) for row in rows]
else:
# Non-SELECT query
result = await conn.execute(query, *params)
data = {"result": result, "status": "executed"}
return StorageResult(
success=True,
data=data,
metadata={"operation": "raw_query", "query_type": "sql"}
)
elif isinstance(query, dict):
# Structured query
return await self._execute_structured_query(conn, resource, query, **kwargs)
else:
raise ValueError(f"Unsupported query type: {type(query)}")
except Exception as e:
logger.error(f"Query operation failed: {e}")
return StorageResult(
success=False,
error=f"Query operation failed: {str(e)}"
)
[docs]
async def delete(self, resource: str, **kwargs) -> StorageResult:
"""
Delete data from PostgreSQL table.
Args:
resource: Table name
**kwargs: Delete parameters (where clause required)
Returns:
StorageResult with operation outcome
"""
if not self.is_connected:
await self.connect()
try:
async with self.connection_pool.acquire() as conn:
if "where" not in kwargs:
raise ValueError("WHERE clause is required for delete operations")
where_clause, params = self._build_where_clause(kwargs["where"])
query = f"DELETE FROM {resource} WHERE {where_clause}"
result = await conn.execute(query, *params)
deleted_count = int(result.split()[-1]) # Extract count from "DELETE n"
return StorageResult(
success=True,
data={"deleted": deleted_count},
metadata={"operation": "delete", "table": resource}
)
except Exception as e:
logger.error(f"Delete operation failed: {e}")
return StorageResult(
success=False,
error=f"Delete operation failed: {str(e)}"
)
def _build_where_clause(self, where_dict: Dict[str, Any]) -> tuple[str, List[Any]]:
"""Build WHERE clause from dictionary."""
conditions = []
params = []
param_index = 1
for key, value in where_dict.items():
if isinstance(value, dict):
# Handle operators like {"age": {"$gt": 25}}
for op, val in value.items():
if op == "$gt":
conditions.append(f"{key} > ${param_index}")
elif op == "$lt":
conditions.append(f"{key} < ${param_index}")
elif op == "$gte":
conditions.append(f"{key} >= ${param_index}")
elif op == "$lte":
conditions.append(f"{key} <= ${param_index}")
elif op == "$ne":
conditions.append(f"{key} != ${param_index}")
elif op == "$in":
placeholders = [f"${param_index + i}" for i in range(len(val))]
conditions.append(f"{key} IN ({', '.join(placeholders)})")
params.extend(val)
param_index += len(val) - 1
continue
else:
raise ValueError(f"Unsupported operator: {op}")
params.append(val)
param_index += 1
else:
# Simple equality
conditions.append(f"{key} = ${param_index}")
params.append(value)
param_index += 1
return " AND ".join(conditions), params
async def _execute_structured_query(self, conn, resource: str, query_dict: Dict, **kwargs) -> StorageResult:
"""Execute structured query dictionary."""
operation = query_dict.get("operation", "select")
if operation == "select":
fields = query_dict.get("fields", "*")
if isinstance(fields, list):
fields = ", ".join(fields)
sql = f"SELECT {fields} FROM {resource}"
params = []
if "where" in query_dict:
where_clause, where_params = self._build_where_clause(query_dict["where"])
sql += f" WHERE {where_clause}"
params.extend(where_params)
if "order_by" in query_dict:
sql += f" ORDER BY {query_dict['order_by']}"
if "limit" in query_dict:
sql += f" LIMIT {query_dict['limit']}"
rows = await conn.fetch(sql, *params)
data = [dict(row) for row in rows]
return StorageResult(
success=True,
data=data,
metadata={"operation": "structured_select", "count": len(data)}
)
else:
raise ValueError(f"Unsupported structured operation: {operation}")
[docs]
async def list_resources(self, prefix: str = "", **kwargs) -> StorageResult:
"""List tables in the database."""
if not self.is_connected:
await self.connect()
try:
async with self.connection_pool.acquire() as conn:
query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
"""
if prefix:
query += f" AND table_name LIKE '{prefix}%'"
query += " ORDER BY table_name"
rows = await conn.fetch(query)
tables = [row["table_name"] for row in rows]
return StorageResult(
success=True,
data=tables,
metadata={"operation": "list_tables", "count": len(tables)}
)
except Exception as e:
logger.error(f"List resources failed: {e}")
return StorageResult(
success=False,
error=f"List resources failed: {str(e)}"
)