Skip to content

Commit 9f0b0eb

Browse files
authored
fix: share duckdb adapter is overlapping data files (#2039)
1 parent 6f1b126 commit 9f0b0eb

3 files changed

Lines changed: 178 additions & 0 deletions

File tree

sqlmesh/core/config/connection.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import abc
44
import base64
5+
import logging
56
import os
67
import pathlib
78
import sys
@@ -33,6 +34,9 @@
3334
from typing_extensions import Literal
3435

3536

37+
logger = logging.getLogger(__name__)
38+
39+
3640
class ConnectionConfig(abc.ABC, BaseConfig):
3741
concurrent_tasks: int
3842
register_comments: bool
@@ -122,6 +126,8 @@ class DuckDBConnectionConfig(ConnectionConfig):
122126

123127
type_: Literal["duckdb"] = Field(alias="type", default="duckdb")
124128

129+
_data_file_to_adapter: t.ClassVar[t.Dict[str, EngineAdapter]] = {}
130+
125131
@model_validator(mode="before")
126132
@model_validator_v1_args
127133
def _validate_database_catalogs(
@@ -182,6 +188,29 @@ def init(cursor: duckdb.DuckDBPyConnection) -> None:
182188

183189
return init
184190

191+
def create_engine_adapter(self, register_comments_override: bool = False) -> EngineAdapter:
192+
"""Checks if another engine adapter has already been created that shares a catalog that points to the same data
193+
file. If so, it uses that same adapter instead of creating a new one. As a result, any additional configuration
194+
associated with the new adapter will be ignored."""
195+
data_files = set((self.catalogs or {}).values())
196+
if self.database:
197+
data_files.add(self.database)
198+
data_files.discard(":memory:")
199+
for data_file in data_files:
200+
if adapter := DuckDBConnectionConfig._data_file_to_adapter.get(data_file):
201+
logger.info(
202+
f"Using existing DuckDB adapter due to overlapping data file: {data_file}"
203+
)
204+
return adapter
205+
if data_files:
206+
logger.info(f"Creating new DuckDB adapter for data files: {data_files}")
207+
else:
208+
logger.info("Creating new DuckDB adapter for in-memory database")
209+
adapter = super().create_engine_adapter(register_comments_override)
210+
for data_file in data_files:
211+
DuckDBConnectionConfig._data_file_to_adapter[data_file] = adapter
212+
return adapter
213+
185214
def get_catalog(self) -> t.Optional[str]:
186215
if self.database:
187216
# Remove `:` from the database name in order to handle if `:memory:` is passed in

tests/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from sqlglot import exp, maybe_parse, parse_one
1717
from sqlglot.helper import ensure_list
1818

19+
from sqlmesh.core.config import DuckDBConnectionConfig
1920
from sqlmesh.core.context import Context
2021
from sqlmesh.core.engine_adapter.base import EngineAdapter
2122
from sqlmesh.core.macros import macro
@@ -176,6 +177,12 @@ def rescope_global_models(request):
176177
model.set_registry(existing_registry)
177178

178179

180+
@pytest.fixture(scope="function", autouse=True)
181+
def rescope_duckdb_classvar(request):
182+
DuckDBConnectionConfig._data_file_to_adapter = {}
183+
yield
184+
185+
179186
@pytest.fixture
180187
def duck_conn() -> duckdb.DuckDBPyConnection:
181188
return duckdb.connect()

tests/core/test_connection_config.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,3 +366,145 @@ def test_duckdb(make_config):
366366
)
367367
assert isinstance(config, DuckDBConnectionConfig)
368368
assert config._static_connection_kwargs == {"config": {"foo": "bar"}}
369+
370+
371+
@pytest.mark.parametrize(
372+
"kwargs1, kwargs2, shared_adapter",
373+
[
374+
(
375+
{
376+
"database": "test.duckdb",
377+
},
378+
{
379+
"database": "test.duckdb",
380+
},
381+
True,
382+
),
383+
(
384+
{},
385+
{},
386+
False,
387+
),
388+
(
389+
{
390+
"database": "test1.duckdb",
391+
},
392+
{
393+
"database": "test2.duckdb",
394+
},
395+
False,
396+
),
397+
(
398+
{
399+
"database": ":memory:",
400+
},
401+
{
402+
"database": ":memory:",
403+
},
404+
False,
405+
),
406+
(
407+
{
408+
"database": ":memory:",
409+
},
410+
{
411+
"database": "test.duckdb",
412+
},
413+
False,
414+
),
415+
(
416+
{
417+
"catalogs": {
418+
"test": "test.duckdb",
419+
}
420+
},
421+
{
422+
"catalogs": {
423+
"test": "test.duckdb",
424+
}
425+
},
426+
True,
427+
),
428+
(
429+
{
430+
"catalogs": {
431+
"test": ":memory:",
432+
}
433+
},
434+
{
435+
"catalogs": {
436+
"test": ":memory:",
437+
}
438+
},
439+
False,
440+
),
441+
(
442+
{
443+
"catalogs": {
444+
"test1": ":memory:",
445+
"test2": "test2.duckdb",
446+
}
447+
},
448+
{
449+
"catalogs": {
450+
"test1": ":memory:",
451+
"test2": "test2.duckdb",
452+
}
453+
},
454+
True,
455+
),
456+
(
457+
{
458+
"catalogs": {
459+
"test1": ":memory:",
460+
"test2": "test2.duckdb",
461+
}
462+
},
463+
{
464+
"catalogs": {
465+
"test1": "test2.duckdb",
466+
"test2": ":memory:",
467+
}
468+
},
469+
True,
470+
),
471+
(
472+
{
473+
"catalogs": {
474+
"test1": "test1.duckdb",
475+
"test2": "test2.duckdb",
476+
"test3": "test3.duckdb",
477+
}
478+
},
479+
{
480+
"catalogs": {
481+
"test1": "test1_miss.duckdb",
482+
"test2": "test2_miss.duckdb",
483+
"test3": "test3.duckdb",
484+
}
485+
},
486+
True,
487+
),
488+
],
489+
)
490+
def test_duckdb_shared(make_config, caplog, kwargs1, kwargs2, shared_adapter):
491+
config1 = make_config(
492+
type="duckdb",
493+
**kwargs1,
494+
)
495+
config2 = make_config(
496+
type="duckdb",
497+
**kwargs2,
498+
)
499+
assert isinstance(config1, DuckDBConnectionConfig)
500+
assert isinstance(config2, DuckDBConnectionConfig)
501+
adapter1 = config1.create_engine_adapter()
502+
adapter2 = config2.create_engine_adapter()
503+
if shared_adapter:
504+
assert id(adapter1) == id(adapter2)
505+
assert "Creating new DuckDB adapter" in caplog.messages[0]
506+
assert "Using existing DuckDB adapter" in caplog.messages[1]
507+
else:
508+
assert id(adapter1) != id(adapter2)
509+
assert "Creating new DuckDB adapter" in caplog.messages[0]
510+
assert "Creating new DuckDB adapter" in caplog.messages[1]

0 commit comments

Comments
 (0)