Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions apps/optimizer/analyzerapp/build.gradle

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.spark.actions.SparkActions;
Expand Down Expand Up @@ -340,17 +339,6 @@ private Map<String, List<String>> prepareBackupDataManifests(
TableScan scan = table.newScan().filter(filter);
try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) {
List<FileScanTask> filesList = Lists.newArrayList(filesIterable);
filesList.stream()
.filter(task -> !Expressions.alwaysTrue().isEquivalentTo(task.residual()))
.findFirst()
.ifPresent(
task -> {
throw new IllegalStateException(
String.format(
"Retention with backup enabled requires a metadata-only delete for table %s, "
+ "but file %s has residual filter %s, which would require a row-level rewrite.",
fqtn, task.file().path(), task.residual()));
});
return filesList.stream()
.collect(
Collectors.groupingBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ private void verifyReplacedTable(
Table replacedTable = catalog.loadTable(TABLE_IDENT);

assertEquals(
originalLocation,
replacedTable.location(),
stripPathScheme(originalLocation),
stripPathScheme(replacedTable.location()),
"Table location should be preserved after replace");
assertEquals(
REPLACE_SCHEMA.asStruct(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,54 +309,6 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
}
}

@Test
public void testRetentionWithBackupFailsWhenColumnPatternMismatchesPartition() throws Exception {
final String tableName = "db.test_retention_backup_pattern_mismatch";
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
// The table is partitioned on `datepartition`, but retention will filter
// on `time_col` using a pattern unrelated to the partitioning. For each
// file's per-file min/max to actually straddle the cutoff (and produce
// a non-trivial residual), both `time_col` values within a partition
// must live in the same data file — so we force a single writer task
// via the COALESCE(1) hint.
ops.spark().sql(String.format("DROP TABLE IF EXISTS %s", tableName)).show();
ops.spark()
.sql(
String.format(
"CREATE TABLE %s (data string, datepartition string, time_col string) "
+ "PARTITIONED BY (datepartition)",
tableName))
.show();
ops.spark()
.sql(
"SELECT data, datepartition, time_col FROM VALUES "
+ "('a', '2024-01', '2020-01-01-00'), "
+ "('b', '2024-01', '2030-01-01-00'), "
+ "('c', '2024-02', '2020-01-01-00'), "
+ "('d', '2024-02', '2030-01-01-00') "
+ "AS t(data, datepartition, time_col)")
.coalesce(1)
.writeTo(tableName)
.append();

// Fix `now` so the cutoff (now - 1 day, formatted yyyy-MM-dd-HH) falls
// strictly between each file's min ("2020-01-01-00") and max
// ("2030-01-01-00") — forcing a non-trivial residual on every file.
ZonedDateTime now = ZonedDateTime.of(2025, 6, 15, 10, 0, 0, 0, ZoneOffset.UTC);
IllegalStateException ex =
Assertions.assertThrows(
IllegalStateException.class,
() ->
ops.runRetention(
tableName, "time_col", "yyyy-MM-dd-HH", "day", 1, true, ".backup", now));
Assertions.assertTrue(
ex.getMessage().contains("metadata-only delete"),
"Expected metadata-only delete error, got: " + ex.getMessage());
// DELETE should not have executed: all 4 rows remain.
verifyRowCount(ops, tableName, 4);
}
}

@Test
public void testOrphanFilesDeletionJavaAPI() throws Exception {
final String tableName = "db.test_ofd_java";
Expand Down
5 changes: 1 addition & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ext {
spark_version = "3.1.1"
ok_http3_version = "4.11.0"
junit_version = "5.11.0"
iceberg_1_2_version = "1.2.0.17"
iceberg_1_2_version = "1.2.0.16"
iceberg_1_5_version = "1.5.2.11"
otel_agent_version = "2.12.0" // Bundles OTel SDK 1.47.0
otel_annotations_version = "2.12.0" // Match agent version
Expand Down Expand Up @@ -177,7 +177,6 @@ tasks.register('CopyGitHooksTask', Copy) {
// tables-service.Dockerfile -> :services:tables:bootJar
// housetables-service.Dockerfile -> :services:housetables:bootJar
// jobs-service.Dockerfile -> :services:jobs:bootJar
// optimizer-service.Dockerfile -> :services:optimizer:bootJar
// jobs-scheduler.Dockerfile -> :apps:openhouse-spark-apps_2.12:shadowJar (uber JAR)
// spark-base-hadoop2.8.dockerfile ->
// :integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar (uber JAR)
Expand All @@ -197,7 +196,6 @@ tasks.register('dockerPrereqs') {
dependsOn ':services:tables:bootJar'
dependsOn ':services:housetables:bootJar'
dependsOn ':services:jobs:bootJar'
dependsOn ':services:optimizer:bootJar'

// Spark runtime uber JARs (shadowJar)
dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar'
Expand All @@ -221,7 +219,6 @@ tasks.register('dockerPrereqs') {
println ' build/tables/libs/tables.jar'
println ' build/housetables/libs/housetables.jar'
println ' build/jobs/libs/jobs.jar'
println ' build/optimizer/libs/optimizer.jar'
println ' build/openhouse-spark-runtime_2.12/libs/openhouse-spark-runtime_2.12-uber.jar'
println ' build/openhouse-spark-3.5-runtime_2.12/libs/openhouse-spark-3.5-runtime_2.12-uber.jar'
println ' build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-uber.jar'
Expand Down
2 changes: 0 additions & 2 deletions iceberg/openhouse/internalcatalog/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ plugins {

dependencies {
implementation 'com.github.spotbugs:spotbugs-annotations:4.8.1'
implementation 'com.github.ben-manes.caffeine:caffeine:2.8.8'
api 'org.springframework.retry:spring-retry:1.3.3'
implementation 'org.springframework:spring-context-support:5.3.18'
implementation "io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:${otel_annotations_version}"
api 'io.opentelemetry:opentelemetry-api:1.47.0'
api project(':client:hts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ private InternalCatalogMetricsConstant() {}
static final String METADATA_UPDATE_LATENCY = "metadata_update_latency";
static final String METADATA_RETRIEVAL_LATENCY = "metadata_retrieval_latency";

public static final String METADATA_CACHE_REMOVAL_CTR = "metadata_cache_removal";

// Tag constants for metric dimensions
static final String DATABASE_TAG = "database";
static final String TABLE_TAG = "table";
public static final String CACHE_REMOVAL_CAUSE_TAG = "cause";
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.linkedin.openhouse.common.exception.AlreadyExistsException;
import com.linkedin.openhouse.common.exception.NoSuchSoftDeletedUserTableException;
import com.linkedin.openhouse.common.utils.NamespaceUtil;
import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
Expand All @@ -26,15 +25,13 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -66,8 +63,6 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired MeterRegistry meterRegistry;

@Autowired TableMetadataCache tableMetadataCache;

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
FileIO fileIO = resolveFileIO(tableIdentifier);
Expand All @@ -79,8 +74,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
houseTableMapper,
tableIdentifier,
metricsReporter,
fileIOManager,
tableMetadataCache);
fileIOManager);
}

@Override
Expand Down Expand Up @@ -125,49 +119,25 @@ public Page<TableIdentifier> listTables(Namespace namespace, Pageable pageable)
.map(houseTable -> TableIdentifier.of(houseTable.getDatabaseId(), houseTable.getTableId()));
}

/**
* Direct HTS lookup that returns the {@link HouseTable} row without parsing metadata.json. Use
* this when only HTS-resident columns (e.g. tableUUID, tableLocation) are needed — for example,
* to authorize a drop without loading the full Iceberg table, which is important when the
* underlying metadata is corrupted and {@link #loadTable} would throw.
*/
public Optional<HouseTable> findHouseTable(TableIdentifier identifier) {
HouseTablePrimaryKey primaryKey =
HouseTablePrimaryKey.builder()
.databaseId(identifier.namespace().toString())
.tableId(identifier.name())
.build();
try {
return houseTableRepository.findById(primaryKey);
} catch (HouseTableNotFoundException e) {
return Optional.empty();
}
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
// Look up the HouseTable row directly instead of calling loadTable(), so drop works even when
// the table's metadata.json is corrupted and cannot be parsed by TableMetadataParser.
HouseTable houseTable =
findHouseTable(identifier)
.orElseThrow(() -> new NoSuchTableException("Table does not exist: %s", identifier));

HouseTablePrimaryKey primaryKey =
HouseTablePrimaryKey.builder()
.databaseId(identifier.namespace().toString())
.tableId(identifier.name())
.build();
String tableLocation = getTableBaseLocation(houseTable, identifier);
String tableLocation = loadTable(identifier).location();
FileIO fileIO = resolveFileIO(identifier);
log.debug("Dropping table {}, purge:{}", tableLocation, purge);
try {
HouseTablePrimaryKey primaryKey =
HouseTablePrimaryKey.builder()
.databaseId(identifier.namespace().toString())
.tableId(identifier.name())
.build();
houseTableRepository.deleteById(primaryKey, purge);
} catch (HouseTableRepositoryException houseTableRepositoryException) {
throw new RuntimeException(
String.format("The table %s cannot be dropped due to the server side error:", identifier),
houseTableRepositoryException);
}
if (purge) {
// Delete data and metadata files from storage.
if (fileIO instanceof SupportsPrefixOperations) {
log.debug("Deleting files for table {}", tableLocation);
((SupportsPrefixOperations) fileIO).deletePrefix(tableLocation);
Expand All @@ -182,23 +152,6 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
return true;
}

/**
* Returns the table base directory derived from the HouseTable's metadata location. OpenHouse
* writes metadata.json directly under the table base subdir, so the parent of the metadata.json
* path is the same value that {@link org.apache.iceberg.Table#location()} would return.
*/
private static String getTableBaseLocation(HouseTable houseTable, TableIdentifier identifier) {
String metadataLocation = houseTable.getTableLocation();
// Defensive check to avoid any unintentional deletion
if (!metadataLocation.endsWith(".metadata.json")) {
throw new IllegalStateException(
String.format(
"Refusing to drop %s: metadata_location does not look like a metadata.json file: %s",
identifier, metadataLocation));
}
return new Path(metadataLocation).getParent().toString();
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
Table fromTable = loadTable(from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorageClient;
import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient;
import com.linkedin.openhouse.common.exception.InvalidTableMetadataException;
import com.linkedin.openhouse.internal.catalog.cache.TableMetadataCache;
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
Expand Down Expand Up @@ -86,8 +85,6 @@ public class OpenHouseInternalTableOperations extends BaseMetastoreTableOperatio

FileIOManager fileIOManager;

TableMetadataCache tableMetadataCache;

private static final Gson GSON = new Gson();

private static final Cache<String, Integer> CACHE =
Expand Down Expand Up @@ -136,10 +133,7 @@ protected void doRefresh() {
protected void refreshMetadata(final String metadataLoc) {
long startTime = System.currentTimeMillis();
boolean needToReload = !Objects.equal(currentMetadataLocation(), metadataLoc);
Runnable r =
() ->
super.refreshFromMetadataLocation(
metadataLoc, null, 20, this::loadTableMetadataWithCache);
Runnable r = () -> super.refreshFromMetadataLocation(metadataLoc);
try {
if (needToReload) {
metricsReporter.executeWithStats(
Expand Down Expand Up @@ -361,7 +355,6 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
InternalCatalogMetricsConstant.METADATA_UPDATE_LATENCY,
getCatalogMetricTags());
tableMetadataCache.seed(newMetadataLocation, updatedMtDataRef);
log.info(
"updateMetadata to location {} succeeded, took {} ms",
newMetadataLocation,
Expand All @@ -379,7 +372,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
writeSpan.end();
}

houseTable = houseTableMapper.toHouseTable(updatedMtDataRef, fileIO);
houseTable = houseTableMapper.toHouseTable(metadataToCommit, fileIO);
if (base != null
&& (properties.containsKey(CatalogConstants.OPENHOUSE_TABLEID_KEY)
&& !properties
Expand Down Expand Up @@ -412,7 +405,7 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
* "forced refresh" in {@link OpenHouseInternalTableOperations#commit(TableMetadata,
* TableMetadata)}
*/
refreshMetadata(newMetadataLocation);
refreshFromMetadataLocation(newMetadataLocation);
}
if (isReplicatedTableCreate(properties)) {
updateMetadataFieldForTable(metadata, newMetadataLocation);
Expand Down Expand Up @@ -793,9 +786,4 @@ private List<String> getIntermediateSchemasFromProps(TableMetadata metadata) {
.create()
.fromJson(serializedNewIntermediateSchemas, new TypeToken<List<String>>() {}.getType());
}

private TableMetadata loadTableMetadataWithCache(String metadataLocation) {
return tableMetadataCache.load(
metadataLocation, () -> TableMetadataParser.read(io(), metadataLocation));
}
}
Loading