|
11 | 11 |
|
12 | 12 |
|
13 | 13 | # Retry configuration for database operations |
14 | | -SQLITE_RETRY_MAX_TRIES = 5 |
15 | | -SQLITE_RETRY_MAX_TIME = 30 # seconds |
| 14 | +SQLITE_RETRY_MAX_TRIES = 10 |
| 15 | +SQLITE_RETRY_MAX_TIME = 60 # seconds |
16 | 16 |
|
17 | 17 |
|
18 | 18 | def _is_database_locked_error(e: Exception) -> bool: |
@@ -55,6 +55,49 @@ def _execute() -> T: |
55 | 55 | return _execute() |
56 | 56 |
|
57 | 57 |
|
| 58 | +def connect_with_retry(db: SqliteDatabase) -> None: |
| 59 | + """ |
| 60 | + Connect to the database with retry logic, ensuring pragmas are always applied. |
| 61 | +
|
| 62 | + Peewee's connect() method sets the connection state *before* executing pragmas |
| 63 | + (in _initialize_connection). If pragma execution fails with "database is locked", |
| 64 | + the connection is marked as open but pragmas are not applied. Subsequent calls |
| 65 | + to connect(reuse_if_open=True) would see the connection as already open and |
| 66 | + skip pragma execution entirely. |
| 67 | +
|
| 68 | + This function handles this edge case by: |
| 69 | + 1. Closing the connection if a lock error occurs during connect |
| 70 | + 2. Retrying with exponential backoff until pragmas are successfully applied |
| 71 | +
|
| 72 | + Args: |
| 73 | + db: The SqliteDatabase instance to connect |
| 74 | + """ |
| 75 | + |
| 76 | + @backoff.on_exception( |
| 77 | + backoff.expo, |
| 78 | + OperationalError, |
| 79 | + max_tries=SQLITE_RETRY_MAX_TRIES, |
| 80 | + max_time=SQLITE_RETRY_MAX_TIME, |
| 81 | + giveup=lambda e: not _is_database_locked_error(e), |
| 82 | + jitter=backoff.full_jitter, |
| 83 | + ) |
| 84 | + def _connect() -> None: |
| 85 | + try: |
| 86 | + # Close any partially-open connection before retrying to ensure |
| 87 | + # a fresh connection is opened and pragmas are executed |
| 88 | + if not db.is_closed(): |
| 89 | + db.close() |
| 90 | + db.connect() |
| 91 | + except OperationalError: |
| 92 | + # If connect fails (e.g., during pragma execution), ensure the |
| 93 | + # connection is closed so the next retry starts fresh |
| 94 | + if not db.is_closed(): |
| 95 | + db.close() |
| 96 | + raise |
| 97 | + |
| 98 | + _connect() |
| 99 | + |
| 100 | + |
58 | 101 | # SQLite pragmas for hardened concurrency safety |
59 | 102 | SQLITE_HARDENED_PRAGMAS = { |
60 | 103 | "journal_mode": "wal", # Write-Ahead Logging for concurrent reads/writes |
@@ -181,9 +224,10 @@ class Event(BaseModel): # type: ignore |
181 | 224 | processed = BooleanField(default=False) # Track if event has been processed |
182 | 225 |
|
183 | 226 | self._Event = Event |
184 | | - self._db.connect() |
| 227 | + # Connect with retry logic that properly handles pragma execution failures |
| 228 | + connect_with_retry(self._db) |
185 | 229 | # Use safe=True to avoid errors when tables already exist |
186 | | - self._db.create_tables([Event], safe=True) |
| 230 | + execute_with_sqlite_retry(lambda: self._db.create_tables([Event], safe=True)) |
187 | 231 |
|
188 | 232 | def publish_event(self, event_type: str, data: Any, process_id: str) -> None: |
189 | 233 | """Publish an event to the database.""" |
|
0 commit comments