Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies = [
"semver<4.0.0",
# Used by authorization resolvers
"jsonpath-ng>=1.6.1",
"psycopg2-binary>=2.9.10",
]


Expand Down
215 changes: 198 additions & 17 deletions src/cache/postgres_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""PostgreSQL cache implementation."""

import psycopg2

from cache.cache import Cache
from cache.cache_error import CacheError
from models.cache_entry import CacheEntry
from models.config import PostgreSQLDatabaseConfiguration
from log import get_logger
Expand All @@ -10,22 +13,143 @@


class PostgresCache(Cache):
"""PostgreSQL cache implementation."""
"""Cache that uses PostgreSQL to store cached values.

The cache itself is stored in following table:

```
Column | Type | Nullable |
-----------------+-----------------------------+----------+
user_id | text | not null |
conversation_id | text | not null |
created_at | int | not null |
query | text | |
response | text | |
provider | text | |
model | text | |
Indexes:
"cache_pkey" PRIMARY KEY, btree (user_id, conversation_id)
"cache_key_key" UNIQUE CONSTRAINT, btree (key)
"timestamps" btree (updated_at)
Access method: heap
```
"""

CREATE_CACHE_TABLE = """
CREATE TABLE IF NOT EXISTS cache (
user_id text NOT NULL,
conversation_id text NOT NULL,
created_at timestamp NOT NULL,
query text,
response text,
provider text,
model text,
PRIMARY KEY(user_id, conversation_id, created_at)
);
"""

CREATE_INDEX = """
CREATE INDEX IF NOT EXISTS timestamps
ON cache (created_at)
"""

SELECT_CONVERSATION_HISTORY_STATEMENT = """
SELECT query, response, provider, model
FROM cache
WHERE user_id=%s AND conversation_id=%s
ORDER BY created_at
"""

INSERT_CONVERSATION_HISTORY_STATEMENT = """
INSERT INTO cache(user_id, conversation_id, created_at, query, response, provider, model)
VALUES (%s, %s, CURRENT_TIMESTAMP, %s, %s, %s, %s)
"""

QUERY_CACHE_SIZE = """
SELECT count(*) FROM cache;
"""

DELETE_SINGLE_CONVERSATION_STATEMENT = """
DELETE FROM cache
WHERE user_id=%s AND conversation_id=%s
"""

LIST_CONVERSATIONS_STATEMENT = """
SELECT conversation_id, max(created_at) AS created_at
FROM cache
WHERE user_id=%s
GROUP BY conversation_id
ORDER BY created_at DESC
"""

def __init__(self, config: PostgreSQLDatabaseConfiguration) -> None:
"""Create a new instance of PostgreSQL cache."""
self.postgres_config = config

# initialize connection to DB
self.connect()
# self.capacity = config.max_entries

Comment on lines 15 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid eager connection in __init__—it crashes when Postgres isn’t running

Constructing PostgresCache currently calls self.connect() immediately. In environments where no PostgreSQL service is up (our CI run shows psycopg2.OperationalError: connection refused), merely instantiating the cache raises and aborts startup/tests. Given we decorate the public methods with @connection, we can initialize the handle lazily and let the decorator establish the connection on first use. Please remove the eager connect() invocation and only seed the attribute.

-        # initialize connection to DB
-        self.connect()
-        # self.capacity = config.max_entries
+        # connection is established lazily by the @connection decorator
+        self.connection = None
+        # self.capacity = config.max_entries
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, config: PostgreSQLDatabaseConfiguration) -> None:
"""Create a new instance of PostgreSQL cache."""
self.postgres_config = config
# initialize connection to DB
self.connect()
# self.capacity = config.max_entries
def __init__(self, config: PostgreSQLDatabaseConfiguration) -> None:
"""Create a new instance of PostgreSQL cache."""
self.postgres_config = config
# connection is established lazily by the @connection decorator
self.connection = None
# self.capacity = config.max_entries
🤖 Prompt for AI Agents
In src/cache/postgres_cache.py around lines 85 to 92, remove the eager
self.connect() call from __init__ and instead seed the connection handle to None
(or the attribute name the @connection decorator expects) so construction never
attempts to open the DB; rely on the existing @connection decorator to establish
the connection on first use. Ensure any existing commented capacity line remains
unchanged and do not perform any network/DB operations in __init__.

# pylint: disable=W0201
def connect(self) -> None:
"""Initialize connection to database."""
logger.info("Connecting to storage")
# make sure the connection will have known state
# even if PostgreSQL is not alive
self.connection = None
config = self.postgres_config
try:
self.connection = psycopg2.connect(
host=config.host,
port=config.port,
user=config.user,
password=config.password.get_secret_value(),
dbname=config.db,
sslmode=config.ssl_mode,
sslrootcert=config.ca_cert_path,
gssencmode=config.gss_encmode,
)
self.initialize_cache()
except Exception as e:
if self.connection is not None:
self.connection.close()
logger.exception("Error initializing Postgres cache:\n%s", e)
raise
self.connection.autocommit = True

def connected(self) -> bool:
"""Check if connection to cache is alive."""
return True
if self.connection is None:
logger.warning("Not connected, need to reconnect later")
return False
try:
with self.connection.cursor() as cursor:
cursor.execute("SELECT 1")
logger.info("Connection to storage is ok")
return True
except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
logger.error("Disconnected from storage: %s", e)
return False

def initialize_cache(self) -> None:
"""Initialize cache."""
"""Initialize cache - clean it up etc."""
if self.connection is None:
logger.error("Cache is disconnected")
raise CacheError("Initialize_cache: cache is disconnected")

# cursor as context manager is not used there on purpose
# any CREATE statement can raise it's own exception
# and it should not interfere with other statements
cursor = self.connection.cursor()

logger.info("Initializing table for cache")
cursor.execute(PostgresCache.CREATE_CACHE_TABLE)

logger.info("Initializing index for cache")
cursor.execute(PostgresCache.CREATE_INDEX)

cursor.close()
self.connection.commit()

@connection
def get(
Expand All @@ -39,11 +163,29 @@ def get(
skip_user_id_check: Skip user_id suid check.

Returns:
Empty list.
The value associated with the key, or None if not found.
"""
# just check if user_id and conversation_id are UUIDs
super().construct_key(user_id, conversation_id, skip_user_id_check)
return []
if self.connection is None:
logger.error("Cache is disconnected")
raise CacheError("get: cache is disconnected")

with self.connection.cursor() as cursor:
cursor.execute(
self.SELECT_CONVERSATION_HISTORY_STATEMENT, (user_id, conversation_id)
)
conversation_entries = cursor.fetchall()

result = []
for conversation_entry in conversation_entries:
cache_entry = CacheEntry(
query=conversation_entry[0],
response=conversation_entry[1],
provider=conversation_entry[2],
model=conversation_entry[3],
)
result.append(cache_entry)

return result

@connection
def insert_or_append(
Expand All @@ -62,8 +204,28 @@ def insert_or_append(
skip_user_id_check: Skip user_id suid check.

"""
# just check if user_id and conversation_id are UUIDs
super().construct_key(user_id, conversation_id, skip_user_id_check)
if self.connection is None:
logger.error("Cache is disconnected")
raise CacheError("insert_or_append: cache is disconnected")

try:
# the whole operation is run in one transaction
with self.connection.cursor() as cursor:
cursor.execute(
PostgresCache.INSERT_CONVERSATION_HISTORY_STATEMENT,
(
user_id,
conversation_id,
cache_entry.query,
cache_entry.response,
cache_entry.provider,
cache_entry.model,
),
)
# commit is implicit at this point
except psycopg2.DatabaseError as e:
logger.error("PostgresCache.insert_or_append: %s", e)
raise CacheError("PostgresCache.insert_or_append", e) from e

@connection
def delete(
Expand All @@ -77,12 +239,24 @@ def delete(
skip_user_id_check: Skip user_id suid check.

Returns:
bool: True in all cases.
bool: True if the conversation was deleted, False if not found.

"""
# just check if user_id and conversation_id are UUIDs
super().construct_key(user_id, conversation_id, skip_user_id_check)
return True
if self.connection is None:
logger.error("Cache is disconnected")
raise CacheError("delete: cache is disconnected")

try:
with self.connection.cursor() as cursor:
cursor.execute(
PostgresCache.DELETE_SINGLE_CONVERSATION_STATEMENT,
(user_id, conversation_id),
)
deleted = cursor.rowcount
return deleted > 0
except psycopg2.DatabaseError as e:
logger.error("PostgresCache.delete: %s", e)
raise CacheError("PostgresCache.delete", e) from e

@connection
def list(self, user_id: str, skip_user_id_check: bool = False) -> list[str]:
Expand All @@ -93,16 +267,23 @@ def list(self, user_id: str, skip_user_id_check: bool = False) -> list[str]:
skip_user_id_check: Skip user_id suid check.

Returns:
An empty list.
A list of conversation ids from the cache

"""
super()._check_user_id(user_id, skip_user_id_check)
return []
if self.connection is None:
logger.error("Cache is disconnected")
raise CacheError("list: cache is disconnected")

with self.connection.cursor() as cursor:
cursor.execute(self.LIST_CONVERSATIONS_STATEMENT, (user_id,))
conversations = cursor.fetchall()

return [conversation[0] for conversation in conversations]

def ready(self) -> bool:
"""Check if the cache is ready.

Returns:
True in all cases.
True if the cache is ready, False otherwise.
"""
return True
3 changes: 2 additions & 1 deletion tests/unit/cache/test_cache_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ def test_conversation_cache_sqlite_improper_config(tmpdir):
_ = CacheFactory.conversation_cache(cc)


def test_conversation_cache_postgres(postgres_cache_config_fixture):
def test_conversation_cache_postgres(postgres_cache_config_fixture, mocker):
"""Check if PostgreSQL is returned by factory with proper configuration."""
mocker.patch("psycopg2.connect")
cache = CacheFactory.conversation_cache(postgres_cache_config_fixture)
assert cache is not None
# check if the object has the right type
Expand Down
Loading
Loading