diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java index 57504357a81..4fb0ca3c10a 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommands.java @@ -93,4 +93,21 @@ public interface JdbcWriterCommands extends JdbcBufferedInserter { * @throws SQLException */ public void copyTable(String databaseName, String from, String to) throws SQLException; + + /** + * Best-effort check of whether the current JDBC user can DROP tables in the given database. + * Used to fail fast before creating staging tables that could not later be cleaned up. + * + *
Implementations should fail open: return {@code true} when the privilege cannot be + * determined, so that a check failure never blocks a legitimately-permitted run. The default + * returns {@code true} for dialects that do not implement a real check. + * + * @param database database/schema in which staging tables are created + * @return {@code true} if DROP is granted (or could not be determined), {@code false} only when + * the user is positively known to lack DROP + * @throws SQLException on a connection-level error + */ + default boolean hasDropPrivilege(String database) throws SQLException { + return true; + } } \ No newline at end of file diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java index 922ceac750c..78d35fb40c9 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.gobblin.configuration.State; @@ -43,12 +44,13 @@ public class MySqlWriterCommands implements JdbcWriterCommands { private static final String CREATE_TABLE_SQL_FORMAT = "CREATE TABLE %s.%s LIKE %s.%s"; private static final String SELECT_SQL_FORMAT = "SELECT COUNT(*) FROM %s.%s"; private static final String TRUNCATE_TABLE_FORMAT = "TRUNCATE TABLE %s.%s"; - private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE %s.%s"; + private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE IF EXISTS %s.%s"; private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT = "SELECT column_name, column_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?"; private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s"; private static final String COPY_REPLACE_STATEMENT_FORMAT = "REPLACE INTO %s.%s SELECT * FROM %s.%s"; private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s"; + private static final String SHOW_GRANTS_SQL = "SHOW GRANTS FOR CURRENT_USER()"; private final JdbcBufferedInserter jdbcBufferedWriter; private final Connection conn; @@ -111,6 +113,62 @@ public void drop(String database, String table) throws SQLException { execute(sql); } + /** + * Best-effort DROP-privilege check via {@code SHOW GRANTS FOR CURRENT_USER()}. Fails open: any + * error or inconclusive parse returns {@code true} so a check failure never blocks a run that is + * actually permitted. Only returns {@code false} when no grant covering DROP on {@code database} + * is found. + * + *
Caveats: {@code SHOW GRANTS FOR CURRENT_USER()} may not expand privileges that come from + * non-default (inactive) roles, and grant-string parsing is necessarily heuristic. This is meant + * to catch the common case (e.g. a CREATE-but-not-DROP role) and avoid leaving orphan staging + * tables, not to be a complete authorization model. + */ + @Override + public boolean hasDropPrivilege(String database) throws SQLException { + try (PreparedStatement pstmt = this.conn.prepareStatement(SHOW_GRANTS_SQL); + ResultSet rs = pstmt.executeQuery()) { + while (rs.next()) { + if (grantCoversDrop(rs.getString(1), database)) { + return true; + } + } + LOG.warn("No grant covering DROP on database '" + database + "' found for current user."); + return false; + } catch (SQLException e) { + // Fail open: never block a legitimately-permitted run because the check itself failed. + LOG.warn("Could not verify DROP privilege on database '" + database + "'; proceeding.", e); + return true; + } + } + + /** + * Returns true if a single {@code SHOW GRANTS} line grants DROP (or ALL PRIVILEGES) on a scope + * that covers {@code database} (either {@code *.*} or {@code `database`.*}). + */ + @VisibleForTesting + static boolean grantCoversDrop(String grantLine, String database) { + if (grantLine == null) { + return false; + } + String g = grantLine.toUpperCase(); + int onIdx = g.indexOf(" ON "); + int toIdx = g.indexOf(" TO "); + if (onIdx < 0 || toIdx < 0 || toIdx < onIdx) { + return false; + } + String privileges = g.substring(0, onIdx); + boolean grantsDrop = privileges.contains("ALL PRIVILEGES") + || privileges.matches(".*\\bDROP\\b.*"); + if (!grantsDrop) { + return false; + } + // Scope looks like "`DB`.*" or "*.*" or "`DB`.`TABLE`"; strip backticks and compare the schema. + String scope = g.substring(onIdx + 4, toIdx).trim().replace("`", ""); + String schema = scope.substring(0, scope.indexOf('.') < 0 ? scope.length() : scope.indexOf('.')); + return schema.equals("*") || schema.equals(database.toUpperCase()); + } + /** * https://dev.mysql.com/doc/connector-j/en/connector-j-reference-type-conversions.html * {@inheritDoc} diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java index 0f78ff31b69..9ca4cbb98d0 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java @@ -22,9 +22,7 @@ import java.sql.SQLException; import java.util.Collection; import java.util.Optional; -import java.util.Random; import java.util.Set; -import java.util.concurrent.TimeUnit; import javax.sql.DataSource; @@ -79,7 +77,6 @@ private static class Memento implements AfterInitializeMemento { private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterInitializer.class); private static final String STAGING_TABLE_FORMAT = "stage_%d"; private static final int NAMING_STAGING_TABLE_TRIAL = 10; - private static final Random RANDOM = new Random(); private final int branches; private final int branchId; @@ -166,37 +163,41 @@ private String createStagingTable(Connection conn, JdbcWriterCommands commands) + this.getClass().getSimpleName() + " for branch " + this.branchId); } - String stagingTable = null; + // Fail fast WITHOUT creating anything if the JDBC user can't drop staging tables. The staging + // table must be dropped at cleanup time; if we can't, the old behavior left orphaned tables + // behind. Previously this was checked by creating a table and dropping it as a probe, which on + // a no-DROP account created (but failed to drop) up to NAMING_STAGING_TABLE_TRIAL orphan tables + // per run. The privilege check is best-effort and fails open (see hasDropPrivilege), so it only + // blocks runs we can positively determine lack DROP. + if (!commands.hasDropPrivilege(database)) { + throw new RuntimeException("JDBC user lacks DROP privilege on database '" + database + + "', which is required to manage staging tables. Grant DROP or use an account that has it."); + } + for (int i = 0; i < NAMING_STAGING_TABLE_TRIAL; i++) { String tmp = String.format(STAGING_TABLE_FORMAT, System.nanoTime()); LOG.info("Check if staging table " + tmp + " exists."); ResultSet res = conn.getMetaData().getTables(null, database, tmp, new String[] { "TABLE" }); - if (!res.next()) { - LOG.info("Staging table " + tmp + " does not exist. Creating."); - try { - commands.createTableStructure(database, destinationTable, tmp); - LOG.info("Test if staging table can be dropped. Test by dropping and Creating staging table."); - commands.drop(database, tmp); - commands.createTableStructure(database, destinationTable, tmp); - stagingTable = tmp; - break; - } catch (SQLException e) { - LOG.warn("Failed to create table. Retrying up to " + NAMING_STAGING_TABLE_TRIAL + " times", e); - } - } else { - LOG.info("Staging table " + tmp + " exists."); + if (res.next()) { + // Name collision (extremely unlikely with nanoTime) - retry with a fresh name. + LOG.info("Staging table " + tmp + " exists. Retrying with a new name."); + continue; } + LOG.info("Staging table " + tmp + " does not exist. Creating."); try { - TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(1000)); - } catch (InterruptedException e) { - LOG.info("Sleep has been interrupted.", e); + commands.createTableStructure(database, destinationTable, tmp); + return tmp; + } catch (SQLException e) { + // Retry on failure (as before), but never leave a partially-created table behind: drop this + // attempt's table before the next try. drop() issues DROP TABLE IF EXISTS, so it is a no-op + // when the create never actually created the table. + LOG.warn("Failed to create staging table " + tmp + ". Dropping it and retrying up to " + + NAMING_STAGING_TABLE_TRIAL + " times.", e); + commands.drop(database, tmp); } } - - if (!StringUtils.isEmpty(stagingTable)) { - return stagingTable; - } - throw new RuntimeException("Failed to create staging table"); + throw new RuntimeException("Failed to create staging table after " + NAMING_STAGING_TABLE_TRIAL + + " attempts"); } private static String getProp(State state, String key, int branches, int branchId) { diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java index 9466832b6ba..9bd2b273a72 100644 --- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/JdbcWriterInitializerTest.java @@ -144,18 +144,62 @@ public void initializeWithCreatingStagingTable() throws SQLException { ResultSet rs = mock(ResultSet.class); when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs); when(rs.next()).thenReturn(Boolean.FALSE); + when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.TRUE); this.initializer.initialize(); Assert.assertTrue(!StringUtils.isEmpty(this.workUnit.getProp(ConfigurationKeys.WRITER_STAGING_TABLE))); InOrder inOrder = inOrder(this.commands); + // Staging table is created exactly once - the create/drop/recreate droppability probe is gone. inOrder.verify(this.commands, times(1)).createTableStructure(anyString(), anyString(), anyString()); - inOrder.verify(this.commands, times(1)).drop(anyString(), anyString()); - inOrder.verify(this.commands, times(1)).createTableStructure(anyString(), anyString(), anyString()); + inOrder.verify(this.commands, never()).drop(anyString(), anyString()); this.initializer.close(); + // The created staging table is dropped only during cleanup. inOrder.verify(this.commands, times(1)).drop(anyString(), anyString()); inOrder.verify(this.commands, never()).truncate(anyString(), anyString()); } + + @Test(expectedExceptions = RuntimeException.class, + expectedExceptionsMessageRegExp = ".*lacks DROP privilege.*") + public void failsFastWhenNoDropPrivilege() throws SQLException { + when(this.commands.isEmpty(DB, STAGING_TABLE)).thenReturn(Boolean.TRUE); + DatabaseMetaData metadata = mock(DatabaseMetaData.class); + when(this.conn.getMetaData()).thenReturn(metadata); + ResultSet rs = mock(ResultSet.class); + when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs); + when(rs.next()).thenReturn(Boolean.FALSE); + when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.FALSE); + + try { + this.initializer.initialize(); + } finally { + // When DROP is unavailable we must bail out before creating anything, so no staging table is + // created and nothing is dropped - this is what prevents the orphan-table accumulation. + verify(this.commands, never()).createTableStructure(anyString(), anyString(), anyString()); + verify(this.commands, never()).drop(anyString(), anyString()); + } + } + + public void retriesAndCleansUpAfterFailedStagingTableCreation() throws SQLException { + when(this.commands.isEmpty(DB, STAGING_TABLE)).thenReturn(Boolean.TRUE); + when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.TRUE); + DatabaseMetaData metadata = mock(DatabaseMetaData.class); + when(this.conn.getMetaData()).thenReturn(metadata); + ResultSet rs = mock(ResultSet.class); + when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs); + when(rs.next()).thenReturn(Boolean.FALSE); + // First create attempt fails, second succeeds. + doThrow(new SQLException("boom")).doNothing() + .when(this.commands).createTableStructure(anyString(), anyString(), anyString()); + + this.initializer.initialize(); + + Assert.assertTrue(!StringUtils.isEmpty(this.workUnit.getProp(ConfigurationKeys.WRITER_STAGING_TABLE))); + // Two create attempts were made, and the failed attempt's table was dropped (cleanup) so no + // orphan is left behind. (close() is not called here, so the only drop is the cleanup.) + verify(this.commands, times(2)).createTableStructure(anyString(), anyString(), anyString()); + verify(this.commands, times(1)).drop(anyString(), anyString()); + } } diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java new file mode 100644 index 00000000000..d369f12357b --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/commands/MySqlWriterCommandsTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer.commands; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Unit tests for {@link MySqlWriterCommands#grantCoversDrop(String, String)}, the heuristic used by + * {@code hasDropPrivilege} to decide (from {@code SHOW GRANTS} output) whether the current user can + * DROP tables in a given database. The parse is intentionally conservative; these cases pin the + * common grant shapes. + */ +@Test(groups = { "gobblin.writer" }) +public class MySqlWriterCommandsTest { + + private static final String DB = "gobblin"; + + public void detectsExplicitDropOnDatabase() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT SELECT, INSERT, CREATE, DROP ON `gobblin`.* TO `u`@`%`", DB)); + } + + public void detectsAllPrivilegesGlobal() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT ALL PRIVILEGES ON *.* TO `u`@`%` WITH GRANT OPTION", DB)); + } + + public void detectsAllPrivilegesOnDatabase() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT ALL PRIVILEGES ON `gobblin`.* TO `u`@`%`", DB)); + } + + public void matchesDatabaseCaseInsensitively() { + Assert.assertTrue(MySqlWriterCommands.grantCoversDrop( + "GRANT DROP ON `GOBBLIN`.* TO `u`@`%`", DB)); + } + + public void falseWhenNoDropPrivilege() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT SELECT, INSERT, UPDATE, CREATE ON `gobblin`.* TO `u`@`%`", DB)); + } + + public void falseForUsageGrant() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT USAGE ON *.* TO `u`@`%`", DB)); + } + + public void falseWhenDropIsForDifferentDatabase() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT DROP ON `other`.* TO `u`@`%`", DB)); + } + + public void dropInTableNameIsNotMistakenForPrivilege() { + // DROP appears only as part of a table name (after ON), not as a granted privilege. + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop( + "GRANT SELECT ON `gobblin`.`drop_audit` TO `u`@`%`", DB)); + } + + public void falseForNullGrantLine() { + Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(null, DB)); + } +}