Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [1.4.15]

* **fix(ibm-watsonx-s3): fail fast on precheck and guard bearer token JSON parsing**

## [1.4.14]

* **fix(teradata): enable Unicode Pass Through on session to prevent Error 6705 on non-BMP characters**
Expand Down
50 changes: 50 additions & 0 deletions test/unit/connectors/ibm_watsonx/test_ibm_watsonx_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,53 @@ def test_ibm_watsonx_uploader_delete_cannot_delete(
"Table doesn't contain expected record id column test_record_id_key, skipping delete"
in caplog.text
)


def test_ibm_watsonx_generate_bearer_token_non_json_response(
mocker: MockerFixture,
connection_config: IbmWatsonxConnectionConfig,
):
"""response.json() failures are caught and re-raised via wrap_error."""
mock_response = mocker.MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.side_effect = ValueError("No JSON object could be decoded")
mocker.patch("httpx.post", return_value=mock_response)
spy_wrap_error = mocker.spy(IbmWatsonxConnectionConfig, "wrap_error")

with pytest.raises(ValueError):
connection_config.generate_bearer_token()

spy_wrap_error.assert_called_once()


def test_ibm_watsonx_connection_config_get_catalog_max_retries_override(
mocker: MockerFixture,
connection_config: IbmWatsonxConnectionConfig,
):
"""max_retries param overrides max_retries_connection; max_retries=1 means a single attempt."""
mock_load_catalog = mocker.patch(
"pyiceberg.catalog.load_catalog",
side_effect=RESTError("Connection error"),
)
mocker.patch.object(
IbmWatsonxConnectionConfig,
"bearer_token",
new="test_bearer_token",
)

with pytest.raises(ProviderError), connection_config.get_catalog(max_retries=1):
pass

# max_retries_connection fixture is 2, but max_retries=1 overrides to a single attempt
assert mock_load_catalog.call_count == 1


def test_ibm_watsonx_uploader_precheck_calls_get_catalog_with_max_retries_1(
mock_get_catalog: MagicMock,
mock_catalog: MagicMock,
uploader: IbmWatsonxUploader,
):
"""precheck() passes max_retries=1 so connection errors surface immediately."""
uploader.precheck()

mock_get_catalog.assert_called_once_with(uploader.connection_config, max_retries=1)
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.4.14" # pragma: no cover
__version__ = "1.4.15" # pragma: no cover
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ def generate_bearer_token(self) -> dict[str, Any]:
try:
response = httpx.post(DEFAULT_IBM_CLOUD_AUTH_URL, headers=headers, data=data)
response.raise_for_status()
return response.json()
except Exception as e:
raise self.wrap_error(e)
return response.json()

def get_catalog_config(self) -> dict[str, Any]:
return {
Expand All @@ -155,7 +155,9 @@ def get_catalog_config(self) -> dict[str, Any]:

@requires_dependencies(["pyiceberg"], extras="ibm-watsonx-s3")
@contextmanager
def get_catalog(self) -> Generator["RestCatalog", None, None]:
def get_catalog(
self, max_retries: Optional[int] = None
) -> Generator["RestCatalog", None, None]:
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import RESTError
from tenacity import (
Expand All @@ -166,9 +168,11 @@ def get_catalog(self) -> Generator["RestCatalog", None, None]:
wait_exponential,
)

retries = max_retries if max_retries is not None else self.max_retries_connection

# Retry connection in case of a connection error
@retry(
stop=stop_after_attempt(self.max_retries_connection),
stop=stop_after_attempt(retries),
wait=wait_exponential(exp_base=2, multiplier=1, min=2, max=10),
retry=retry_if_exception_type(RESTError),
before=before_log(logger, logging.DEBUG),
Expand Down Expand Up @@ -225,7 +229,9 @@ class IbmWatsonxUploader(SQLUploader):
connector_type: str = CONNECTOR_TYPE

def precheck(self) -> None:
with self.connection_config.get_catalog() as catalog:
# Use max_retries=1 (no retries) during precheck so a transient provider error
# surfaces quickly rather than blocking for the full exponential-backoff window.
with self.connection_config.get_catalog(max_retries=1) as catalog:
if not catalog.namespace_exists(self.upload_config.namespace):
raise UserError(f"Namespace '{self.upload_config.namespace}' does not exist")
if not catalog.table_exists(self.upload_config.table_identifier):
Expand Down
Loading