Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ bidict==0.22.0
cached_property==1.5.2
psutil==5.9.0
tzlocal==2.1
pyiceberg==0.8.1
pyiceberg==0.9.0
s3fs==2025.9.0
aiobotocore==2.25.1
botocore==1.40.53
readerwriterlock==1.0.9
tenacity==8.5.0
SQLAlchemy==2.0.37
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
from pyiceberg.catalog import Catalog
from typing import Optional

from core.storage.iceberg.iceberg_utils import create_postgres_catalog
from core.storage.iceberg.iceberg_utils import (
create_postgres_catalog,
create_rest_catalog,
)
from core.storage.storage_config import StorageConfig


class IcebergCatalogInstance:
"""
IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance.
Currently only postgres SQL catalog is supported.
Supports postgres SQL catalog and REST catalog.
- Provides a single shared catalog for all Iceberg table-related operations.
- Lazily initializes the catalog on first access.
- Supports replacing the catalog instance for testing or reconfiguration.
Expand All @@ -39,16 +42,31 @@ def get_instance(cls):
Retrieves the singleton Iceberg catalog instance.
- If the catalog is not initialized, it is lazily created using the configured
properties.
- Supports "postgres" and "rest" catalog types.
:return: the Iceberg catalog instance.
"""
if cls._instance is None:
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE
if catalog_type == "postgres":
cls._instance = create_postgres_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH,
StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME,
StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD,
)
elif catalog_type == "rest":
cls._instance = create_rest_catalog(
"texera_iceberg",
StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME,
StorageConfig.ICEBERG_REST_CATALOG_URI,
StorageConfig.S3_ENDPOINT,
StorageConfig.S3_REGION,
StorageConfig.S3_AUTH_USERNAME,
StorageConfig.S3_AUTH_PASSWORD,
)
else:
raise ValueError(f"Unsupported catalog type: {catalog_type}")
return cls._instance

@classmethod
Expand Down
40 changes: 39 additions & 1 deletion amber/src/main/python/core/storage/iceberg/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pyarrow as pa
import pyiceberg.table
from pyiceberg.catalog import Catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.expressions import AlwaysTrue
from pyiceberg.io.pyarrow import ArrowScan
Expand Down Expand Up @@ -153,6 +153,44 @@ def create_postgres_catalog(
)


def create_rest_catalog(
catalog_name: str,
warehouse_name: str,
rest_uri: str,
s3_endpoint: str,
s3_region: str,
s3_username: str,
s3_password: str,
) -> Catalog:
"""
Creates a REST catalog instance by connecting to a REST endpoint.
- Configures the catalog to interact with a REST endpoint.
- The warehouse_name parameter specifies the warehouse identifier (name for Lakekeeper).
- Configures S3FileIO for MinIO/S3 storage backend.
:param catalog_name: the name of the catalog.
:param warehouse_name: the warehouse identifier (name for Lakekeeper).
:param rest_uri: the URI of the REST catalog endpoint.
:param s3_endpoint: the S3 endpoint URL.
:param s3_region: the S3 region.
:param s3_username: the S3 access key ID.
:param s3_password: the S3 secret access key.
:return: a Catalog instance (REST catalog).
"""
return load_catalog(
catalog_name,
**{
"type": "rest",
"uri": rest_uri,
"warehouse": warehouse_name,
"s3.endpoint": s3_endpoint,
"s3.access-key-id": s3_username,
"s3.secret-access-key": s3_password,
"s3.region": s3_region,
"s3.path-style-access": "true",
},
)


def create_table(
catalog: Catalog,
table_namespace: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@

# Hardcoded storage config only for test purposes.
StorageConfig.initialize(
catalog_type="postgres",
postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog",
postgres_username="texera",
postgres_password="password",
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="operator-port-result",
directory_path="../../../../../../amber/user-resources/workflow-results",
commit_batch_size=4096,
s3_endpoint="http://localhost:9000",
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
s3_region="us-west-2",
s3_auth_username="texera_minio",
s3_auth_password="password",
)


Expand Down
12 changes: 11 additions & 1 deletion amber/src/main/python/core/storage/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ class StorageConfig:

_initialized = False

ICEBERG_CATALOG_TYPE = None
ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None
ICEBERG_POSTGRES_CATALOG_USERNAME = None
ICEBERG_POSTGRES_CATALOG_PASSWORD = None
ICEBERG_REST_CATALOG_URI = None
ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None
ICEBERG_TABLE_RESULT_NAMESPACE = None
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None

# S3 configs (for large_binary_manager module)
# S3 configs
S3_ENDPOINT = None
S3_REGION = None
S3_AUTH_USERNAME = None
Expand All @@ -41,9 +44,12 @@ class StorageConfig:
@classmethod
def initialize(
cls,
catalog_type,
postgres_uri_without_scheme,
postgres_username,
postgres_password,
rest_catalog_uri,
rest_catalog_warehouse_name,
table_result_namespace,
directory_path,
commit_batch_size,
Expand All @@ -57,9 +63,13 @@ def initialize(
"Storage config has already been initialized and cannot be modified."
)

cls.ICEBERG_CATALOG_TYPE = catalog_type
cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme
cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username
cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri
cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name

cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ def setup_storage_config(self):
"""Initialize StorageConfig for tests."""
if not StorageConfig._initialized:
StorageConfig.initialize(
catalog_type="postgres",
postgres_uri_without_scheme="localhost:5432/test",
postgres_username="test",
postgres_password="test",
rest_catalog_uri="http://localhost:8181/catalog/",
rest_catalog_warehouse_name="texera",
table_result_namespace="test",
directory_path="/tmp/test",
commit_batch_size=1000,
s3_endpoint="http://localhost:9000",
s3_region="us-east-1",
s3_auth_username="minioadmin",
s3_auth_password="minioadmin",
s3_region="us-west-2",
s3_auth_username="texera_minio",
s3_auth_password="password",
)

def test_get_s3_client_initializes_once(self):
Expand Down
6 changes: 6 additions & 0 deletions amber/src/main/python/texera_run_python_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ def init_loguru_logger(stream_log_level) -> None:
output_port,
logger_level,
r_path,
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
Expand All @@ -58,9 +61,12 @@ def init_loguru_logger(stream_log_level) -> None:
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,12 @@ class PythonWorkflowWorker(
Integer.toString(pythonProxyServer.getPortNumber.get()),
UdfConfig.pythonLogStreamHandlerLevel,
RENVPath,
StorageConfig.icebergCatalogType,
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
StorageConfig.icebergPostgresCatalogUsername,
StorageConfig.icebergPostgresCatalogPassword,
StorageConfig.icebergRESTCatalogUri,
StorageConfig.icebergRESTCatalogWarehouseName,
StorageConfig.icebergTableResultNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString,
Expand Down
11 changes: 11 additions & 0 deletions common/config/src/main/resources/storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ storage {
rest-uri = ""
rest-uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI} # the uri of the rest catalog, not needed unless using REST catalog

rest {
uri = "http://localhost:8181/catalog/"
uri = ${?STORAGE_ICEBERG_CATALOG_REST_URI}
warehouse-name = "texera"
warehouse-name = ${?STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME}
region = "us-west-2"
region = ${?STORAGE_ICEBERG_CATALOG_REST_REGION}
s3-bucket = "texera-iceberg"
s3-bucket = ${?STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET}
}

postgres {
# do not include scheme in the uri as Python and Java use different schemes
uri-without-scheme = "localhost:5432/texera_iceberg_catalog"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object EnvironmentalVariable {
// Iceberg Catalog
val ENV_ICEBERG_CATALOG_TYPE = "STORAGE_ICEBERG_CATALOG_TYPE"
val ENV_ICEBERG_CATALOG_REST_URI = "STORAGE_ICEBERG_CATALOG_REST_URI"
val ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME = "STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME"

// Iceberg Postgres Catalog
val ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ object StorageConfig {

// Iceberg specifics
val icebergCatalogType: String = conf.getString("storage.iceberg.catalog.type")
val icebergRESTCatalogUri: String = conf.getString("storage.iceberg.catalog.rest-uri")
val icebergRESTCatalogUri: String = conf.getString("storage.iceberg.catalog.rest.uri")
val icebergRESTCatalogWarehouseName: String =
conf.getString("storage.iceberg.catalog.rest.warehouse-name")

// Iceberg Postgres specifics
val icebergPostgresCatalogUriWithoutScheme: String =
Expand Down
19 changes: 17 additions & 2 deletions common/workflow-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,14 @@ dependencyOverrides ++= Seq(
"io.netty" % "netty-codec" % nettyVersion,
"io.netty" % "netty-codec-http" % nettyVersion,
"io.netty" % "netty-codec-http2" % nettyVersion,
"io.netty" % "netty-codec-socks" % nettyVersion,
"io.netty" % "netty-common" % nettyVersion,
"io.netty" % "netty-handler" % nettyVersion,
"io.netty" % "netty-handler-proxy" % nettyVersion,
"io.netty" % "netty-resolver" % nettyVersion,
"io.netty" % "netty-transport" % nettyVersion,
"io.netty" % "netty-transport-classes-epoll" % nettyVersion,
"io.netty" % "netty-transport-native-epoll" % nettyVersion,
"io.netty" % "netty-transport-native-unix-common" % nettyVersion
)

Expand Down Expand Up @@ -167,6 +171,10 @@ libraryDependencies ++= Seq(
excludeJackson,
excludeJacksonModule
),
"org.apache.iceberg" % "iceberg-aws" % "1.7.1" excludeAll(
excludeJackson,
excludeJacksonModule
),
"org.apache.hadoop" % "hadoop-common" % "3.3.1" excludeAll(
excludeXmlBind,
excludeGlassfishJersey,
Expand Down Expand Up @@ -208,6 +216,13 @@ libraryDependencies ++= Seq(
"software.amazon.awssdk" % "s3" % "2.29.51" excludeAll(
ExclusionRule(organization = "io.netty")
),
"software.amazon.awssdk" % "auth" % "2.29.51",
"software.amazon.awssdk" % "regions" % "2.29.51",
"software.amazon.awssdk" % "auth" % "2.29.51" excludeAll(
ExclusionRule(organization = "io.netty")
),
"software.amazon.awssdk" % "regions" % "2.29.51" excludeAll(
ExclusionRule(organization = "io.netty")
),
"software.amazon.awssdk" % "sts" % "2.29.51" excludeAll(
ExclusionRule(organization = "io.netty")
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object IcebergCatalogInstance {
case "rest" =>
IcebergUtil.createRestCatalog(
"texera_iceberg",
StorageConfig.fileStorageDirectoryPath
StorageConfig.icebergRESTCatalogWarehouseName
)
case "postgres" =>
IcebergUtil.createPostgresCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,19 @@ private[storage] class IcebergTableWriter[T](
private def flushBuffer(): Unit = {
if (buffer.nonEmpty) {
// Create a unique file path using the writer's identifier and the filename index
val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}")
// Handle S3 URIs (s3://) differently from local file paths to preserve URI format
val location = table.location()
val filepathString = if (location.startsWith("s3://")) {
// For S3 URIs, append path component directly as string to preserve s3:// format
val basePath = if (location.endsWith("/")) location else s"$location/"
s"$basePath${writerIdentifier}_${filenameIdx}"
} else {
// For local file paths, use Paths.get() for proper path resolution
Paths.get(location).resolve(s"${writerIdentifier}_${filenameIdx}").toString
}
// Increment the filename index by 1
filenameIdx += 1
val outputFile: OutputFile = table.io().newOutputFile(filepath.toString)
val outputFile: OutputFile = table.io().newOutputFile(filepathString)
// Create a Parquet data writer to write a new file
val dataWriter: DataWriter[Record] = Parquet
.writeData(outputFile)
Expand Down
Loading
Loading