From 65a0a77ce50071f30fba5248e5691270bde9fb43 Mon Sep 17 00:00:00 2001 From: Daisy Modi Date: Thu, 11 Jun 2026 21:29:38 +0530 Subject: [PATCH 1/3] [GOBBLIN-XXXX] Avoid orphaned JDBC staging tables by checking DROP privilege up front JdbcWriterInitializer tested whether staging tables were droppable by creating a table and dropping it as a probe, inside a 10-iteration retry loop. On an account with CREATE but not DROP, each attempt created a stage_ table that could not be dropped, leaving up to NAMING_STAGING_TABLE_TRIAL (10) orphaned tables per run. Instead, check DROP privilege up front via a new JdbcWriterCommands.hasDropPrivilege() and fail fast before creating anything when it is positively absent. The MySQL impl inspects SHOW GRANTS FOR CURRENT_USER() and fails open (returns true) on any error or inconclusive parse, so a check failure never blocks a legitimately-permitted run. Other dialects inherit a default that returns true. createStagingTable now also creates the staging table exactly once (no drop/recreate probe) and only retries on a genuine name collision; a real create failure propagates instead of spinning and leaking tables. Note: this addresses staging-table orphans from the missing-DROP-privilege path only. Orphans from runs killed mid-write (e.g. connection/timeout, SIGKILL) bypass cleanup entirely and require a separate out-of-band sweep. Tests: add MySqlWriterCommandsTest for the SHOW GRANTS parse, and a JdbcWriterInitializer test asserting a no-DROP account fails fast and creates nothing; update the existing staging-table test for the create-once behavior. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../writer/commands/JdbcWriterCommands.java | 17 ++++ .../writer/commands/MySqlWriterCommands.java | 58 ++++++++++++++ .../initializer/JdbcWriterInitializer.java | 50 ++++++------ .../writer/JdbcWriterInitializerTest.java | 27 ++++++- .../commands/MySqlWriterCommandsTest.java | 79 +++++++++++++++++++ 5 files changed, 201 insertions(+), 30 deletions(-) create mode 100644 gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java index 57504357a81..4fb0ca3c10a 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java @@ -93,4 +93,21 @@ public interface JdbcWriterCommands extends JdbcBufferedInserter { * @throws SQLException */ public void copyTable(String databaseName, String from, String to) throws SQLException; + + /** + * Best-effort check of whether the current JDBC user can DROP tables in the given database. + * Used to fail fast before creating staging tables that could not later be cleaned up. + * + *

Implementations should fail open: return {@code true} when the privilege cannot be + * determined, so that a check failure never blocks a legitimately-permitted run. The default + * returns {@code true} for dialects that do not implement a real check. + * + * @param database database/schema in which staging tables are created + * @return {@code true} if DROP is granted (or could not be determined), {@code false} only when + * the user is positively known to lack DROP + * @throws SQLException on a connection-level error + */ + default boolean hasDropPrivilege(String database) throws SQLException { + return true; + } } \ No newline at end of file diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java index 922ceac750c..865e21844e8 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.gobblin.configuration.State; @@ -49,6 +50,7 @@ public class MySqlWriterCommands implements JdbcWriterCommands { private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s"; private static final String COPY_REPLACE_STATEMENT_FORMAT = "REPLACE INTO %s.%s SELECT * FROM %s.%s"; private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s"; + private static final String SHOW_GRANTS_SQL = "SHOW GRANTS FOR CURRENT_USER()"; private final JdbcBufferedInserter jdbcBufferedWriter; private final Connection conn; @@ -111,6 +113,62 @@ public void drop(String database, String table) throws SQLException { execute(sql); } + /** + * Best-effort DROP-privilege check via {@code SHOW GRANTS FOR CURRENT_USER()}. Fails open: any + * error or inconclusive parse returns {@code true} so a check failure never blocks a run that is + * actually permitted. Only returns {@code false} when no grant covering DROP on {@code database} + * is found. + * + *

Caveats: {@code SHOW GRANTS FOR CURRENT_USER()} may not expand privileges that come from + * non-default (inactive) roles, and grant-string parsing is necessarily heuristic. This is meant + * to catch the common case (e.g. a CREATE-but-not-DROP role) and avoid leaving orphan staging + * tables, not to be a complete authorization model. + */ + @Override + public boolean hasDropPrivilege(String database) throws SQLException { + try (PreparedStatement pstmt = this.conn.prepareStatement(SHOW_GRANTS_SQL); + ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + if (grantCoversDrop(rs.getString(1), database)) { + return true; + } + } + LOG.warn("No grant covering DROP on database '" + database + "' found for current user."); + return false; + } catch (SQLException e) { + // Fail open: never block a legitimately-permitted run because the check itself failed. + LOG.warn("Could not verify DROP privilege on database '" + database + "'; proceeding.", e); + return true; + } + } + + /** + * Returns true if a single {@code SHOW GRANTS} line grants DROP (or ALL PRIVILEGES) on a scope + * that covers {@code database} (either {@code *.*} or {@code `database`.*}). + */ + @VisibleForTesting + static boolean grantCoversDrop(String grantLine, String database) { + if (grantLine == null) { + return false; + } + String g = grantLine.toUpperCase(); + int onIdx = g.indexOf(" ON "); + int toIdx = g.indexOf(" TO "); + if (onIdx < 0 || toIdx < 0 || toIdx < onIdx) { + return false; + } + String privileges = g.substring(0, onIdx); + boolean grantsDrop = privileges.contains("ALL PRIVILEGES") + || privileges.matches(".*\\bDROP\\b.*"); + if (!grantsDrop) { + return false; + } + // Scope looks like "`DB`.*" or "*.*" or "`DB`.`TABLE`"; strip backticks and compare the schema. + String scope = g.substring(onIdx + 4, toIdx).trim().replace("`", ""); + String schema = scope.substring(0, scope.indexOf('.') < 0 ? scope.length() : scope.indexOf('.')); + return schema.equals("*") || schema.equals(database.toUpperCase()); + } + /** * https://dev.mysql.com/doc/connector-j/en/connector-j-reference-type-conversions.html * {@inheritDoc} diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java index 0f78ff31b69..1488ba1c736 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java @@ -22,9 +22,7 @@ import java.sql.SQLException; import java.util.Collection; import java.util.Optional; -import java.util.Random; import java.util.Set; -import java.util.concurrent.TimeUnit; import javax.sql.DataSource; @@ -79,7 +77,6 @@ private static class Memento implements AfterInitializeMemento { private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterInitializer.class); private static final String STAGING_TABLE_FORMAT = "stage_%d"; private static final int NAMING_STAGING_TABLE_TRIAL = 10; - private static final Random RANDOM = new Random(); private final int branches; private final int branchId; @@ -166,37 +163,34 @@ private String createStagingTable(Connection conn, JdbcWriterCommands commands) + this.getClass().getSimpleName() + " for branch " + this.branchId); } - String stagingTable = null; + // Fail fast WITHOUT creating anything if the JDBC user can't drop staging tables. The staging + // table must be dropped at cleanup time; if we can't, the old behavior left orphaned tables + // behind. Previously this was checked by creating a table and dropping it as a probe, which on + // a no-DROP account created (but failed to drop) up to NAMING_STAGING_TABLE_TRIAL orphan tables + // per run. The privilege check is best-effort and fails open (see hasDropPrivilege), so it only + // blocks runs we can positively determine lack DROP. + if (!commands.hasDropPrivilege(database)) { + throw new RuntimeException("JDBC user lacks DROP privilege on database '" + database + + "', which is required to manage staging tables. Grant DROP or use an account that has it."); + } + for (int i = 0; i < NAMING_STAGING_TABLE_TRIAL; i++) { String tmp = String.format(STAGING_TABLE_FORMAT, System.nanoTime()); LOG.info("Check if staging table " + tmp + " exists."); ResultSet res = conn.getMetaData().getTables(null, database, tmp, new String[] { "TABLE" }); - if (!res.next()) { - LOG.info("Staging table " + tmp + " does not exist. Creating."); - try { - commands.createTableStructure(database, destinationTable, tmp); - LOG.info("Test if staging table can be dropped. Test by dropping and Creating staging table."); - commands.drop(database, tmp); - commands.createTableStructure(database, destinationTable, tmp); - stagingTable = tmp; - break; - } catch (SQLException e) { - LOG.warn("Failed to create table. Retrying up to " + NAMING_STAGING_TABLE_TRIAL + " times", e); - } - } else { - LOG.info("Staging table " + tmp + " exists."); + if (res.next()) { + // Name collision (extremely unlikely with nanoTime) - retry with a fresh name. + LOG.info("Staging table " + tmp + " exists. Retrying with a new name."); + continue; } - try { - TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(1000)); - } catch (InterruptedException e) { - LOG.info("Sleep has been interrupted.", e); - } - } - - if (!StringUtils.isEmpty(stagingTable)) { - return stagingTable; + LOG.info("Staging table " + tmp + " does not exist. Creating."); + // Create once. A real failure here (e.g. missing CREATE, missing destination table) is a + // genuine error and is propagated rather than retried, so we don't spin and leak tables. + commands.createTableStructure(database, destinationTable, tmp); + return tmp; } - throw new RuntimeException("Failed to create staging table"); + throw new RuntimeException("Failed to create staging table after " + NAMING_STAGING_TABLE_TRIAL + + " attempts"); } private static String getProp(State state, String key, int branches, int branchId) { diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java index 9466832b6ba..849daaaecfa 100644 --- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java @@ -144,18 +144,41 @@ public void initializeWithCreatingStagingTable() throws SQLException { ResultSet rs = mock(ResultSet.class); when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs); when(rs.next()).thenReturn(Boolean.FALSE); + when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.TRUE); this.initializer.initialize(); Assert.assertTrue(!StringUtils.isEmpty(this.workUnit.getProp(ConfigurationKeys.WRITER_STAGING_TABLE))); InOrder inOrder = inOrder(this.commands); + // Staging table is created exactly once - the create/drop/recreate droppability probe is gone. inOrder.verify(this.commands, times(1)).createTableStructure(anyString(), anyString(), anyString()); - inOrder.verify(this.commands, times(1)).drop(anyString(), anyString()); - inOrder.verify(this.commands, times(1)).createTableStructure(anyString(), anyString(), anyString()); + inOrder.verify(this.commands, never()).drop(anyString(), anyString()); this.initializer.close(); + // The created staging table is dropped only during cleanup. inOrder.verify(this.commands, times(1)).drop(anyString(), anyString()); inOrder.verify(this.commands, never()).truncate(anyString(), anyString()); } + + @Test(expectedExceptions = RuntimeException.class, + expectedExceptionsMessageRegExp = ".*lacks DROP privilege.*") + public void failsFastWhenNoDropPrivilege() throws SQLException { + when(this.commands.isEmpty(DB, STAGING_TABLE)).thenReturn(Boolean.TRUE); + DatabaseMetaData metadata = mock(DatabaseMetaData.class); + when(this.conn.getMetaData()).thenReturn(metadata); + ResultSet rs = mock(ResultSet.class); + when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs); + when(rs.next()).thenReturn(Boolean.FALSE); + when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.FALSE); + + try { + this.initializer.initialize(); + } finally { + // When DROP is unavailable we must bail out before creating anything, so no staging table is + // created and nothing is dropped - this is what prevents the orphan-table accumulation. + verify(this.commands, never()).createTableStructure(anyString(), anyString(), anyString()); + verify(this.commands, never()).drop(anyString(), anyString()); + } + } } diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java new file mode 100644 index 00000000000..d369f12357b --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer.commands; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Unit tests for {@link MySqlWriterCommands#grantCoversDrop(String, String)}, the heuristic used by + * {@code hasDropPrivilege} to decide (from {@code SHOW GRANTS} output) whether the current user can + * DROP tables in a given database. The parse is intentionally conservative; these cases pin the + * common grant shapes. + */ +@Test(groups = { "gobblin.writer" }) +public class MySqlWriterCommandsTest { + + private static final String DB = "gobblin"; + + public void detectsExplicitDropOnDatabase() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT SELECT, INSERT, CREATE, DROP ON `gobblin`.* TO `u`@`%`", DB)); + } + + public void detectsAllPrivilegesGlobal() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT ALL PRIVILEGES ON *.* TO `u`@`%` WITH GRANT OPTION", DB)); + } + + public void detectsAllPrivilegesOnDatabase() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT ALL PRIVILEGES ON `gobblin`.* TO `u`@`%`", DB)); + } + + public void matchesDatabaseCaseInsensitively() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT DROP ON `GOBBLIN`.* TO `u`@`%`", DB)); + } + + public void falseWhenNoDropPrivilege() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT SELECT, INSERT, UPDATE, CREATE ON `gobblin`.* TO `u`@`%`", DB)); + } + + public void falseForUsageGrant() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT USAGE ON *.* TO `u`@`%`", DB)); + } + + public void falseWhenDropIsForDifferentDatabase() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT DROP ON `other`.* TO `u`@`%`", DB)); + } + + public void dropInTableNameIsNotMistakenForPrivilege() { + // DROP appears only as part of a table name (after ON), not as a granted privilege. + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT SELECT ON `gobblin`.`drop_audit` TO `u`@`%`", DB)); + } + + public void falseForNullGrantLine() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(null, DB)); + } +} From 910daa3407ac569561f7d633e859f6ea348c5db5 Mon Sep 17 00:00:00 2001 From: Daisy Modi Date: Thu, 11 Jun 2026 21:59:40 +0530 Subject: [PATCH 2/3] [GOBBLIN-XXXX] Retry staging-table creation with cleanup instead of failing outright Restore the create-failure retry that the previous commit dropped, but make each failed attempt clean up after itself: on a SQLException from createTableStructure, best-effort drop that attempt's table before retrying, so a transient failure never leaves an orphan behind. DROP privilege is verified up front, so the cleanup drop should succeed; if the table was never created the drop is a harmless no-op. Add a test asserting that when the first create attempt fails and the second succeeds, two create attempts are made and the failed attempt's table is dropped. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../initializer/JdbcWriterInitializer.java | 19 +++++++++++++---- .../writer/JdbcWriterInitializerTest.java | 21 +++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java index 1488ba1c736..3c5ea057ecc 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java @@ -184,10 +184,21 @@ private String createStagingTable(Connection conn, JdbcWriterCommands commands) continue; } LOG.info("Staging table " + tmp + " does not exist. Creating."); - // Create once. A real failure here (e.g. missing CREATE, missing destination table) is a - // genuine error and is propagated rather than retried, so we don't spin and leak tables. - commands.createTableStructure(database, destinationTable, tmp); - return tmp; + try { + commands.createTableStructure(database, destinationTable, tmp); + return tmp; + } catch (SQLException e) { + // Retry on failure (as before), but never leave a partially-created table behind: do a + // best-effort drop of this attempt's table before the next try. DROP was verified above, + // so cleanup should succeed; if the table was never created, the drop is a harmless no-op. + LOG.warn("Failed to create staging table " + tmp + ". Cleaning up and retrying up to " + + NAMING_STAGING_TABLE_TRIAL + " times.", e); + try { + commands.drop(database, tmp); + } catch (SQLException dropError) { + LOG.warn("Best-effort cleanup of " + tmp + " failed.", dropError); + } + } } throw new RuntimeException("Failed to create staging table after " + NAMING_STAGING_TABLE_TRIAL + " attempts"); diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java index 849daaaecfa..9bd2b273a72 100644 --- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java @@ -181,4 +181,25 @@ public void failsFastWhenNoDropPrivilege() throws SQLException { verify(this.commands, never()).drop(anyString(), anyString()); } } + + public void retriesAndCleansUpAfterFailedStagingTableCreation() throws SQLException { + when(this.commands.isEmpty(DB, STAGING_TABLE)).thenReturn(Boolean.TRUE); + when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.TRUE); + DatabaseMetaData metadata = mock(DatabaseMetaData.class); + when(this.conn.getMetaData()).thenReturn(metadata); + ResultSet rs = mock(ResultSet.class); + when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs); + when(rs.next()).thenReturn(Boolean.FALSE); + // First create attempt fails, second succeeds. + doThrow(new SQLException("boom")).doNothing() + .when(this.commands).createTableStructure(anyString(), anyString(), anyString()); + + this.initializer.initialize(); + + Assert.assertTrue(!StringUtils.isEmpty(this.workUnit.getProp(ConfigurationKeys.WRITER_STAGING_TABLE))); + // Two create attempts were made, and the failed attempt's table was dropped (cleanup) so no + // orphan is left behind. (close() is not called here, so the only drop is the cleanup.) + verify(this.commands, times(2)).createTableStructure(anyString(), anyString(), anyString()); + verify(this.commands, times(1)).drop(anyString(), anyString()); + } } From bf7d05590731c9bdd86b4bbb6c8b9af41beeff43 Mon Sep 17 00:00:00 2001 From: Daisy Modi Date: Thu, 11 Jun 2026 22:09:27 +0530 Subject: [PATCH 3/3] [GOBBLIN-XXXX] Use DROP TABLE IF EXISTS for idempotent staging-table cleanup Make MySqlWriterCommands.drop issue DROP TABLE IF EXISTS so dropping a table that does not exist is a no-op rather than an error. This lets the staging-table create retry path clean up its attempt with a plain drop() (no best-effort try/catch needed): if a failed create never actually created the table, the cleanup drop simply does nothing. All MySQL drops (including close() cleanup) become idempotent as a result. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../writer/commands/MySqlWriterCommands.java | 2 +- .../writer/initializer/JdbcWriterInitializer.java | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java index 865e21844e8..78d35fb40c9 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java @@ -44,7 +44,7 @@ public class MySqlWriterCommands implements JdbcWriterCommands { private static final String CREATE_TABLE_SQL_FORMAT = "CREATE TABLE %s.%s LIKE %s.%s"; private static final String SELECT_SQL_FORMAT = "SELECT COUNT(*) FROM %s.%s"; private static final String TRUNCATE_TABLE_FORMAT = "TRUNCATE TABLE %s.%s"; - private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE %s.%s"; + private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE IF EXISTS %s.%s"; private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT = "SELECT column_name, column_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?"; private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s"; diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java index 3c5ea057ecc..9ca4cbb98d0 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java @@ -188,16 +188,12 @@ private String createStagingTable(Connection conn, JdbcWriterCommands commands) commands.createTableStructure(database, destinationTable, tmp); return tmp; } catch (SQLException e) { - // Retry on failure (as before), but never leave a partially-created table behind: do a - // best-effort drop of this attempt's table before the next try. DROP was verified above, - // so cleanup should succeed; if the table was never created, the drop is a harmless no-op. - LOG.warn("Failed to create staging table " + tmp + ". Cleaning up and retrying up to " + // Retry on failure (as before), but never leave a partially-created table behind: drop this + // attempt's table before the next try. drop() issues DROP TABLE IF EXISTS, so it is a no-op + // when the create never actually created the table. + LOG.warn("Failed to create staging table " + tmp + ". Dropping it and retrying up to " + NAMING_STAGING_TABLE_TRIAL + " times.", e); - try { - commands.drop(database, tmp); - } catch (SQLException dropError) { - LOG.warn("Best-effort cleanup of " + tmp + " failed.", dropError); - } + commands.drop(database, tmp); } } throw new RuntimeException("Failed to create staging table after " + NAMING_STAGING_TABLE_TRIAL