Source code for praval.storage.providers.qdrant_provider

"""
Qdrant vector storage provider for Praval framework

Integrates with existing Praval memory system to provide vector storage
capabilities through the storage framework.
"""

import json
import logging
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
import uuid

try:
    from qdrant_client import QdrantClient
    from qdrant_client.http import models
    from qdrant_client.http.models import Distance, VectorParams, PointStruct
    QDRANT_AVAILABLE = True
except ImportError:
    QDRANT_AVAILABLE = False
    QdrantClient = None
    models = None

from ..base_provider import BaseStorageProvider, StorageMetadata, StorageResult, StorageType, DataReference
from ..exceptions import StorageConnectionError, StorageConfigurationError

logger = logging.getLogger(__name__)


[docs] class QdrantProvider(BaseStorageProvider): """ Qdrant vector database storage provider. Features: - Vector similarity search - Collection management - Point insertion and retrieval - Filtering and metadata search - Batch operations - Integration with Praval memory system """ def _create_metadata(self) -> StorageMetadata: return StorageMetadata( name=self.name, description="Qdrant vector database storage provider", storage_type=StorageType.VECTOR, supports_async=True, supports_transactions=False, supports_schemas=True, # Collections are like schemas supports_indexing=True, supports_search=True, supports_streaming=False, default_timeout=30.0, required_config=["url"], optional_config=[ "collection_name", "vector_size", "distance_metric", "api_key", "timeout", "prefer_grpc" ], connection_string_template="qdrant://{url}/{collection_name}" ) def _initialize(self): """Initialize Qdrant-specific settings.""" if not QDRANT_AVAILABLE: raise ImportError("qdrant-client is required for Qdrant provider. Install with: pip install qdrant-client") # Set default values self.config.setdefault("collection_name", "praval_storage") self.config.setdefault("vector_size", 1536) # OpenAI embedding size self.config.setdefault("distance_metric", "cosine") self.config.setdefault("timeout", 30.0) self.config.setdefault("prefer_grpc", False) self.qdrant_client: Optional[QdrantClient] = None self.default_collection = self.config["collection_name"]
[docs] async def connect(self) -> bool: """Establish connection to Qdrant.""" try: client_kwargs = { "url": self.config["url"], "timeout": self.config["timeout"], "prefer_grpc": self.config["prefer_grpc"] } if "api_key" in self.config: client_kwargs["api_key"] = self.config["api_key"] self.qdrant_client = QdrantClient(**client_kwargs) # Test connection collections = self.qdrant_client.get_collections() # Ensure default collection exists await self._ensure_collection_exists(self.default_collection) self.is_connected = True logger.info(f"Connected to Qdrant: {self.config['url']}") return True except Exception as e: logger.error(f"Failed to connect to Qdrant: {e}") raise StorageConnectionError(self.name, str(e))
[docs] async def disconnect(self): """Close Qdrant connection.""" if self.qdrant_client: self.qdrant_client.close() self.qdrant_client = None self.is_connected = False logger.info(f"Disconnected from Qdrant: {self.name}")
async def _ensure_collection_exists(self, collection_name: str): """Ensure collection exists, create if not.""" try: collection_info = self.qdrant_client.get_collection(collection_name) logger.debug(f"Collection '{collection_name}' exists") except Exception: # Collection doesn't exist, create it self.qdrant_client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=self.config["vector_size"], distance=Distance.COSINE if self.config["distance_metric"] == "cosine" else Distance.EUCLIDEAN ) ) logger.info(f"Created Qdrant collection: {collection_name}")
[docs] async def store(self, resource: str, data: Any, **kwargs) -> StorageResult: """ Store vector data in Qdrant. Args: resource: Collection name (optional, uses default if not specified) data: Data to store - can be: - Single point: {"id": "...", "vector": [...], "payload": {...}} - Multiple points: [{"id": "...", "vector": [...], "payload": {...}}, ...] - Just vector: [0.1, 0.2, ...] (will generate ID) **kwargs: Additional parameters Returns: StorageResult with operation outcome """ if not self.is_connected: await self.connect() try: collection_name = resource or self.default_collection await self._ensure_collection_exists(collection_name) points = [] if isinstance(data, dict): # Single point if "vector" in data: point_id = data.get("id", str(uuid.uuid4())) points.append(PointStruct( id=point_id, vector=data["vector"], payload=data.get("payload", {}) )) else: raise ValueError("Dictionary must contain 'vector' field") elif isinstance(data, list): if len(data) > 0 and isinstance(data[0], dict): # Multiple points for item in data: if "vector" not in item: raise ValueError("Each point must contain 'vector' field") point_id = item.get("id", str(uuid.uuid4())) points.append(PointStruct( id=point_id, vector=item["vector"], payload=item.get("payload", {}) )) elif len(data) > 0 and isinstance(data[0], (int, float)): # Single vector array point_id = kwargs.get("id", str(uuid.uuid4())) payload = kwargs.get("payload", {}) points.append(PointStruct( id=point_id, vector=data, payload=payload )) else: raise ValueError("Invalid data format for vector storage") else: raise ValueError(f"Unsupported data type: {type(data)}") # Store points in Qdrant operation_info = self.qdrant_client.upsert( collection_name=collection_name, points=points ) return StorageResult( success=True, data={ "collection": collection_name, "points_stored": len(points), "operation_id": operation_info.operation_id if hasattr(operation_info, 'operation_id') else None }, metadata={ "operation": "upsert_points", "collection": collection_name, "point_count": len(points) }, data_reference=DataReference( provider=self.name, storage_type=StorageType.VECTOR, resource_id=f"{collection_name}:{points[0].id}" if len(points) == 1 else collection_name, metadata={ "collection": collection_name, "point_count": len(points) } ) ) except Exception as e: logger.error(f"Store operation failed: {e}") return StorageResult( success=False, error=f"Store operation failed: {str(e)}" )
[docs] async def retrieve(self, resource: str, **kwargs) -> StorageResult: """ Retrieve vectors from Qdrant. Args: resource: Collection name or "collection:point_id" **kwargs: Retrieval parameters (point_ids, with_vectors, with_payload) Returns: StorageResult with retrieved data """ if not self.is_connected: await self.connect() try: # Parse resource if ":" in resource: collection_name, point_id = resource.split(":", 1) point_ids = [point_id] else: collection_name = resource point_ids = kwargs.get("point_ids", kwargs.get("ids", [])) if not point_ids: raise ValueError("Point IDs required for retrieval") # Retrieve points points = self.qdrant_client.retrieve( collection_name=collection_name, ids=point_ids, with_vectors=kwargs.get("with_vectors", True), with_payload=kwargs.get("with_payload", True) ) # Format results results = [] for point in points: result = { "id": point.id, "payload": point.payload or {} } if point.vector is not None: result["vector"] = point.vector results.append(result) return StorageResult( success=True, data=results if len(results) > 1 else (results[0] if results else None), metadata={ "operation": "retrieve_points", "collection": collection_name, "point_count": len(results) } ) except Exception as e: logger.error(f"Retrieve operation failed: {e}") return StorageResult( success=False, error=f"Retrieve operation failed: {str(e)}" )
[docs] async def query(self, resource: str, query: Union[str, Dict], **kwargs) -> StorageResult: """ Execute vector search or other operations. Args: resource: Collection name query: Query type or search vector **kwargs: Query parameters Returns: StorageResult with query results """ if not self.is_connected: await self.connect() try: collection_name = resource or self.default_collection if isinstance(query, str): if query == "search": # Vector similarity search query_vector = kwargs.get("vector") if not query_vector: raise ValueError("Search vector required") limit = kwargs.get("limit", 10) score_threshold = kwargs.get("score_threshold") query_filter = kwargs.get("filter") search_params = { "collection_name": collection_name, "query_vector": query_vector, "limit": limit, "with_vectors": kwargs.get("with_vectors", False), "with_payload": kwargs.get("with_payload", True) } if score_threshold is not None: search_params["score_threshold"] = score_threshold if query_filter: search_params["query_filter"] = query_filter search_results = self.qdrant_client.search(**search_params) results = [] for result in search_results: item = { "id": result.id, "score": result.score, "payload": result.payload or {} } if result.vector is not None: item["vector"] = result.vector results.append(item) return StorageResult( success=True, data=results, metadata={ "operation": "vector_search", "collection": collection_name, "result_count": len(results), "limit": limit } ) elif query == "count": # Count points in collection count_result = self.qdrant_client.count( collection_name=collection_name, count_filter=kwargs.get("filter") ) return StorageResult( success=True, data={"count": count_result.count}, metadata={"operation": "count_points", "collection": collection_name} ) elif query == "scroll": # Scroll through points scroll_result = self.qdrant_client.scroll( collection_name=collection_name, scroll_filter=kwargs.get("filter"), limit=kwargs.get("limit", 10), offset=kwargs.get("offset"), with_vectors=kwargs.get("with_vectors", False), with_payload=kwargs.get("with_payload", True) ) points, next_offset = scroll_result results = [] for point in points: item = { "id": point.id, "payload": point.payload or {} } if point.vector is not None: item["vector"] = point.vector results.append(item) return StorageResult( success=True, data={ "points": results, "next_offset": next_offset }, metadata={ "operation": "scroll_points", "collection": collection_name, "point_count": len(results) } ) else: raise ValueError(f"Unsupported query type: {query}") elif isinstance(query, list) and all(isinstance(x, (int, float)) for x in query): # Direct vector search search_results = self.qdrant_client.search( collection_name=collection_name, query_vector=query, limit=kwargs.get("limit", 10), with_vectors=kwargs.get("with_vectors", False), with_payload=kwargs.get("with_payload", True) ) results = [] for result in search_results: item = { "id": result.id, "score": result.score, "payload": result.payload or {} } if result.vector is not None: item["vector"] = result.vector results.append(item) return StorageResult( success=True, data=results, metadata={ "operation": "vector_search", "collection": collection_name, "result_count": len(results) } ) else: raise ValueError(f"Unsupported query format: {type(query)}") except Exception as e: logger.error(f"Query operation failed: {e}") return StorageResult( success=False, error=f"Query operation failed: {str(e)}" )
[docs] async def delete(self, resource: str, **kwargs) -> StorageResult: """ Delete points from Qdrant. Args: resource: Collection name or "collection:point_id" **kwargs: Delete parameters (point_ids, filter) Returns: StorageResult with operation outcome """ if not self.is_connected: await self.connect() try: # Parse resource if ":" in resource: collection_name, point_id = resource.split(":", 1) point_ids = [point_id] else: collection_name = resource point_ids = kwargs.get("point_ids", kwargs.get("ids", [])) if point_ids: # Delete specific points operation_info = self.qdrant_client.delete( collection_name=collection_name, points_selector=models.PointIdsList(points=point_ids) ) return StorageResult( success=True, data={"deleted": len(point_ids)}, metadata={ "operation": "delete_points", "collection": collection_name, "point_ids": point_ids } ) elif "filter" in kwargs: # Delete points matching filter operation_info = self.qdrant_client.delete( collection_name=collection_name, points_selector=models.FilterSelector(filter=kwargs["filter"]) ) return StorageResult( success=True, data={"deleted": "filtered"}, metadata={ "operation": "delete_filtered", "collection": collection_name } ) else: raise ValueError("Either point_ids or filter must be specified for deletion") except Exception as e: logger.error(f"Delete operation failed: {e}") return StorageResult( success=False, error=f"Delete operation failed: {str(e)}" )
[docs] async def list_resources(self, prefix: str = "", **kwargs) -> StorageResult: """List collections in Qdrant.""" if not self.is_connected: await self.connect() try: collections_response = self.qdrant_client.get_collections() collections = [] for collection in collections_response.collections: if not prefix or collection.name.startswith(prefix): collections.append({ "name": collection.name, "points_count": collection.points_count, "segments_count": collection.segments_count, "status": collection.status }) return StorageResult( success=True, data=collections, metadata={"operation": "list_collections", "count": len(collections)} ) except Exception as e: logger.error(f"List resources failed: {e}") return StorageResult( success=False, error=f"List resources failed: {str(e)}" )