Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions crates/catalog/rest/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,29 @@ impl serde_core::ser::Serialize for iceberg_catalog_rest::CreateTableRequest
pub fn iceberg_catalog_rest::CreateTableRequest::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg_catalog_rest::CreateTableRequest
pub fn iceberg_catalog_rest::CreateTableRequest::deserialize<__D>(__deserializer: __D) -> core::result::Result<Self, <__D as serde_core::de::Deserializer>::Error> where __D: serde_core::de::Deserializer<'de>
pub struct iceberg_catalog_rest::Endpoint
impl iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::method(&self) -> &str
pub fn iceberg_catalog_rest::Endpoint::path(&self) -> &str
impl core::clone::Clone for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::clone(&self) -> iceberg_catalog_rest::Endpoint
impl core::cmp::Eq for iceberg_catalog_rest::Endpoint
impl core::cmp::PartialEq for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::eq(&self, other: &iceberg_catalog_rest::Endpoint) -> bool
impl core::fmt::Debug for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::fmt::Display for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::hash::Hash for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::hash<__H: core::hash::Hasher>(&self, state: &mut __H)
impl core::marker::StructuralPartialEq for iceberg_catalog_rest::Endpoint
impl core::str::traits::FromStr for iceberg_catalog_rest::Endpoint
pub type iceberg_catalog_rest::Endpoint::Err = iceberg::error::Error
pub fn iceberg_catalog_rest::Endpoint::from_str(s: &str) -> core::result::Result<Self, Self::Err>
impl serde_core::ser::Serialize for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::serialize<S>(&self, serializer: S) -> core::result::Result<<S as serde_core::ser::Serializer>::Ok, <S as serde_core::ser::Serializer>::Error> where S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg_catalog_rest::Endpoint
pub fn iceberg_catalog_rest::Endpoint::deserialize<D>(deserializer: D) -> core::result::Result<Self, <D as serde_core::de::Deserializer>::Error> where D: serde_core::de::Deserializer<'de>
pub struct iceberg_catalog_rest::ErrorModel
pub iceberg_catalog_rest::ErrorModel::code: u16
pub iceberg_catalog_rest::ErrorModel::message: alloc::string::String
Expand Down Expand Up @@ -187,6 +210,7 @@ pub struct iceberg_catalog_rest::RestCatalog
impl iceberg_catalog_rest::RestCatalog
pub async fn iceberg_catalog_rest::RestCatalog::invalidate_token(&self) -> iceberg::error::Result<()>
pub async fn iceberg_catalog_rest::RestCatalog::regenerate_token(&self) -> iceberg::error::Result<()>
pub async fn iceberg_catalog_rest::RestCatalog::supports_endpoint(&self, endpoint: &iceberg_catalog_rest::Endpoint) -> iceberg::error::Result<bool>
impl core::fmt::Debug for iceberg_catalog_rest::RestCatalog
pub fn iceberg_catalog_rest::RestCatalog::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl iceberg::catalog::Catalog for iceberg_catalog_rest::RestCatalog
Expand Down
135 changes: 133 additions & 2 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! This module contains the iceberg REST catalog implementation.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -40,6 +40,7 @@ use typed_builder::TypedBuilder;
use crate::client::{
HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
};
use crate::endpoint::Endpoint;
use crate::types::{
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult,
Expand Down Expand Up @@ -347,6 +348,10 @@ struct RestContext {
///
/// It's could be different from the user config.
config: RestCatalogConfig,
/// The negotiated set of endpoints used for capability negotiation: the
/// server's advertised list, or the default base set when its
/// `GET /v1/config` response omits `endpoints` or sends an empty list.
endpoints: HashSet<Endpoint>,
}

/// Rest catalog implementation.
Expand Down Expand Up @@ -412,14 +417,42 @@ impl RestCatalog {
.get_or_try_init(|| async {
let client = HttpClient::new(&self.user_config)?;
let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
// The `endpoints` field is optional. A non-empty advertised list
// is used as-is; an absent field or an empty list falls back to
// the standard base set every server is expected to support.
let endpoints = match &catalog_config.endpoints {
Some(advertised) if !advertised.is_empty() => {
advertised.iter().cloned().collect()
}
_ => crate::endpoint::DEFAULT_ENDPOINTS.clone(),
};
Comment on lines +423 to +428

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case the server returns an explicit empty list , I think the right thing to do is still just use the default endpoints, those were established to be a minimum to be supported by a server so maybe smth like:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let endpoints =  match &catalog_config.endpoints {
      Some(advertised) if !advertised.is_empty() => advertised.iter().cloned().collect(),
      _ => crate::endpoint::DEFAULT_ENDPOINTS.clone(),
}

@huan233usc huan233usc Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done -- thanks for suggestion

let config = self.user_config.clone().merge_with_config(catalog_config);
let client = client.update_with(&config)?;

Ok(RestContext { config, client })
Ok(RestContext {
config,
client,
endpoints,
})
})
.await
}

/// Returns whether the connected server supports `endpoint`.
///
/// Servers advertise the routes they support in the `endpoints` field of
/// the `GET /v1/config` response. A non-empty advertised list is used as-is.
/// An absent field or an empty list falls back to a standard base set of
/// namespace and table operations, so a server that predates the field still
/// resolves its core operations as supported; any optional endpoint outside
/// that base set is reported unsupported unless it is explicitly advertised.
///
/// The server config is fetched once and cached, so the first call may
/// incur a round-trip to `GET /v1/config`.
pub async fn supports_endpoint(&self, endpoint: &Endpoint) -> Result<bool> {
Ok(self.context().await?.endpoints.contains(endpoint))
}

/// Load the runtime config from the server by `user_config`.
///
/// It's required for a REST catalog to update its config after creation.
Expand Down Expand Up @@ -1135,6 +1168,104 @@ mod tests {
config_mock.assert_async().await;
}

#[tokio::test]
async fn test_config_advertised_endpoints() {
let mut server = Server::new_async().await;

let config_mock = server
.mock("GET", "/v1/config")
.with_status(200)
.with_body(
r#"{
"overrides": {},
"defaults": {},
"endpoints": [
"GET /v1/{prefix}/namespaces",
"POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"
]
}"#,
)
.create_async()
.await;

let catalog = RestCatalog::new(
RestCatalogConfig::builder().uri(server.url()).build(),
Some(Arc::new(LocalFsStorageFactory)),
Runtime::current(),
);

let plan = "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"
.parse::<Endpoint>()
.unwrap();
assert!(catalog.supports_endpoint(&plan).await.unwrap());
// Advertised list is present but does not include this route.
let delete_ns = "DELETE /v1/{prefix}/namespaces/{namespace}"
.parse::<Endpoint>()
.unwrap();
assert!(!catalog.supports_endpoint(&delete_ns).await.unwrap());

config_mock.assert_async().await;
}

#[tokio::test]
async fn test_config_without_endpoints_falls_back_to_default_set() {
let mut server = Server::new_async().await;

let config_mock = server
.mock("GET", "/v1/config")
.with_status(200)
.with_body(r#"{ "overrides": {}, "defaults": {} }"#)
.create_async()
.await;

let catalog = RestCatalog::new(
RestCatalogConfig::builder().uri(server.url()).build(),
Some(Arc::new(LocalFsStorageFactory)),
Runtime::current(),
);

// A server that omits the `endpoints` field is assumed to support the
// standard base operations.
let load_table = "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}"
.parse::<Endpoint>()
.unwrap();
assert!(catalog.supports_endpoint(&load_table).await.unwrap());
// But not an optional endpoint that must be advertised.
let plan = "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"
.parse::<Endpoint>()
.unwrap();
assert!(!catalog.supports_endpoint(&plan).await.unwrap());

config_mock.assert_async().await;
}

#[tokio::test]
async fn test_config_with_empty_endpoints_falls_back_to_default_set() {
let mut server = Server::new_async().await;

// An explicit empty list is treated the same as an absent field: fall
// back to the standard base set.
let config_mock = server
.mock("GET", "/v1/config")
.with_status(200)
.with_body(r#"{ "overrides": {}, "defaults": {}, "endpoints": [] }"#)
.create_async()
.await;

let catalog = RestCatalog::new(
RestCatalogConfig::builder().uri(server.url()).build(),
Some(Arc::new(LocalFsStorageFactory)),
Runtime::current(),
);

let load_table = "GET /v1/{prefix}/namespaces/{namespace}/tables/{table}"
.parse::<Endpoint>()
.unwrap();
assert!(catalog.supports_endpoint(&load_table).await.unwrap());

config_mock.assert_async().await;
}

async fn create_config_mock(server: &mut ServerGuard) -> Mock {
server
.mock("GET", "/v1/config")
Expand Down
Loading
Loading