Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,21 @@ public interface JdbcWriterCommands extends JdbcBufferedInserter {
* @throws SQLException
*/
public void copyTable(String databaseName, String from, String to) throws SQLException;

/**
* Best-effort check of whether the current JDBC user can DROP tables in the given database.
* Used to fail fast before creating staging tables that could not later be cleaned up.
*
* <p>Implementations should <b>fail open</b>: return {@code true} when the privilege cannot be
* determined, so that a check failure never blocks a legitimately-permitted run. The default
* returns {@code true} for dialects that do not implement a real check.
*
* @param database database/schema in which staging tables are created
* @return {@code true} if DROP is granted (or could not be determined), {@code false} only when
* the user is positively known to lack DROP
* @throws SQLException on a connection-level error
*/
default boolean hasDropPrivilege(String database) throws SQLException {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

import org.apache.gobblin.configuration.State;
Expand All @@ -43,12 +44,13 @@ public class MySqlWriterCommands implements JdbcWriterCommands {
private static final String CREATE_TABLE_SQL_FORMAT = "CREATE TABLE %s.%s LIKE %s.%s";
private static final String SELECT_SQL_FORMAT = "SELECT COUNT(*) FROM %s.%s";
private static final String TRUNCATE_TABLE_FORMAT = "TRUNCATE TABLE %s.%s";
private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE %s.%s";
private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE IF EXISTS %s.%s";
private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT =
"SELECT column_name, column_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s";
private static final String COPY_REPLACE_STATEMENT_FORMAT = "REPLACE INTO %s.%s SELECT * FROM %s.%s";
private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s";
private static final String SHOW_GRANTS_SQL = "SHOW GRANTS FOR CURRENT_USER()";

private final JdbcBufferedInserter jdbcBufferedWriter;
private final Connection conn;
Expand Down Expand Up @@ -111,6 +113,62 @@ public void drop(String database, String table) throws SQLException {
execute(sql);
}

/**
* Best-effort DROP-privilege check via {@code SHOW GRANTS FOR CURRENT_USER()}. Fails open: any
* error or inconclusive parse returns {@code true} so a check failure never blocks a run that is
* actually permitted. Only returns {@code false} when no grant covering DROP on {@code database}
* is found.
*
* <p>Caveats: {@code SHOW GRANTS FOR CURRENT_USER()} may not expand privileges that come from
* non-default (inactive) roles, and grant-string parsing is necessarily heuristic. This is meant
* to catch the common case (e.g. a CREATE-but-not-DROP role) and avoid leaving orphan staging
* tables, not to be a complete authorization model.
*/
@Override
public boolean hasDropPrivilege(String database) throws SQLException {
try (PreparedStatement pstmt = this.conn.prepareStatement(SHOW_GRANTS_SQL);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
if (grantCoversDrop(rs.getString(1), database)) {
return true;
}
}
LOG.warn("No grant covering DROP on database '" + database + "' found for current user.");
return false;
} catch (SQLException e) {
// Fail open: never block a legitimately-permitted run because the check itself failed.
LOG.warn("Could not verify DROP privilege on database '" + database + "'; proceeding.", e);
return true;
}
}

/**
* Returns true if a single {@code SHOW GRANTS} line grants DROP (or ALL PRIVILEGES) on a scope
* that covers {@code database} (either {@code *.*} or {@code `database`.*}).
*/
@VisibleForTesting
static boolean grantCoversDrop(String grantLine, String database) {
if (grantLine == null) {
return false;
}
String g = grantLine.toUpperCase();
int onIdx = g.indexOf(" ON ");
int toIdx = g.indexOf(" TO ");
if (onIdx < 0 || toIdx < 0 || toIdx < onIdx) {
return false;
}
String privileges = g.substring(0, onIdx);
boolean grantsDrop = privileges.contains("ALL PRIVILEGES")
|| privileges.matches(".*\\bDROP\\b.*");
if (!grantsDrop) {
return false;
}
// Scope looks like "`DB`.*" or "*.*" or "`DB`.`TABLE`"; strip backticks and compare the schema.
String scope = g.substring(onIdx + 4, toIdx).trim().replace("`", "");
String schema = scope.substring(0, scope.indexOf('.') < 0 ? scope.length() : scope.indexOf('.'));
return schema.equals("*") || schema.equals(database.toUpperCase());
}

/**
* https://dev.mysql.com/doc/connector-j/en/connector-j-reference-type-conversions.html
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

Expand Down Expand Up @@ -79,7 +77,6 @@ private static class Memento implements AfterInitializeMemento {
private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterInitializer.class);
private static final String STAGING_TABLE_FORMAT = "stage_%d";
private static final int NAMING_STAGING_TABLE_TRIAL = 10;
private static final Random RANDOM = new Random();

private final int branches;
private final int branchId;
Expand Down Expand Up @@ -166,37 +163,41 @@ private String createStagingTable(Connection conn, JdbcWriterCommands commands)
+ this.getClass().getSimpleName() + " for branch " + this.branchId);
}

String stagingTable = null;
// Fail fast WITHOUT creating anything if the JDBC user can't drop staging tables. The staging
// table must be dropped at cleanup time; if we can't, the old behavior left orphaned tables
// behind. Previously this was checked by creating a table and dropping it as a probe, which on
// a no-DROP account created (but failed to drop) up to NAMING_STAGING_TABLE_TRIAL orphan tables
// per run. The privilege check is best-effort and fails open (see hasDropPrivilege), so it only
// blocks runs we can positively determine lack DROP.
if (!commands.hasDropPrivilege(database)) {
throw new RuntimeException("JDBC user lacks DROP privilege on database '" + database
+ "', which is required to manage staging tables. Grant DROP or use an account that has it.");
}

for (int i = 0; i < NAMING_STAGING_TABLE_TRIAL; i++) {
String tmp = String.format(STAGING_TABLE_FORMAT, System.nanoTime());
LOG.info("Check if staging table " + tmp + " exists.");
ResultSet res = conn.getMetaData().getTables(null, database, tmp, new String[] { "TABLE" });
if (!res.next()) {
LOG.info("Staging table " + tmp + " does not exist. Creating.");
try {
commands.createTableStructure(database, destinationTable, tmp);
LOG.info("Test if staging table can be dropped. Test by dropping and Creating staging table.");
commands.drop(database, tmp);
commands.createTableStructure(database, destinationTable, tmp);
stagingTable = tmp;
break;
} catch (SQLException e) {
LOG.warn("Failed to create table. Retrying up to " + NAMING_STAGING_TABLE_TRIAL + " times", e);
}
} else {
LOG.info("Staging table " + tmp + " exists.");
if (res.next()) {
// Name collision (extremely unlikely with nanoTime) - retry with a fresh name.
LOG.info("Staging table " + tmp + " exists. Retrying with a new name.");
continue;
}
LOG.info("Staging table " + tmp + " does not exist. Creating.");
try {
TimeUnit.MILLISECONDS.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
LOG.info("Sleep has been interrupted.", e);
commands.createTableStructure(database, destinationTable, tmp);
return tmp;
} catch (SQLException e) {
// Retry on failure (as before), but never leave a partially-created table behind: drop this
// attempt's table before the next try. drop() issues DROP TABLE IF EXISTS, so it is a no-op
// when the create never actually created the table.
LOG.warn("Failed to create staging table " + tmp + ". Dropping it and retrying up to "
+ NAMING_STAGING_TABLE_TRIAL + " times.", e);
commands.drop(database, tmp);
}
}

if (!StringUtils.isEmpty(stagingTable)) {
return stagingTable;
}
throw new RuntimeException("Failed to create staging table");
throw new RuntimeException("Failed to create staging table after " + NAMING_STAGING_TABLE_TRIAL
+ " attempts");
}

private static String getProp(State state, String key, int branches, int branchId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,62 @@ public void initializeWithCreatingStagingTable() throws SQLException {
ResultSet rs = mock(ResultSet.class);
when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs);
when(rs.next()).thenReturn(Boolean.FALSE);
when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.TRUE);

this.initializer.initialize();

Assert.assertTrue(!StringUtils.isEmpty(this.workUnit.getProp(ConfigurationKeys.WRITER_STAGING_TABLE)));

InOrder inOrder = inOrder(this.commands);
// Staging table is created exactly once - the create/drop/recreate droppability probe is gone.
inOrder.verify(this.commands, times(1)).createTableStructure(anyString(), anyString(), anyString());
inOrder.verify(this.commands, times(1)).drop(anyString(), anyString());
inOrder.verify(this.commands, times(1)).createTableStructure(anyString(), anyString(), anyString());
inOrder.verify(this.commands, never()).drop(anyString(), anyString());

this.initializer.close();
// The created staging table is dropped only during cleanup.
inOrder.verify(this.commands, times(1)).drop(anyString(), anyString());
inOrder.verify(this.commands, never()).truncate(anyString(), anyString());
}

@Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = ".*lacks DROP privilege.*")
public void failsFastWhenNoDropPrivilege() throws SQLException {
when(this.commands.isEmpty(DB, STAGING_TABLE)).thenReturn(Boolean.TRUE);
DatabaseMetaData metadata = mock(DatabaseMetaData.class);
when(this.conn.getMetaData()).thenReturn(metadata);
ResultSet rs = mock(ResultSet.class);
when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs);
when(rs.next()).thenReturn(Boolean.FALSE);
when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.FALSE);

try {
this.initializer.initialize();
} finally {
// When DROP is unavailable we must bail out before creating anything, so no staging table is
// created and nothing is dropped - this is what prevents the orphan-table accumulation.
verify(this.commands, never()).createTableStructure(anyString(), anyString(), anyString());
verify(this.commands, never()).drop(anyString(), anyString());
}
}

public void retriesAndCleansUpAfterFailedStagingTableCreation() throws SQLException {
when(this.commands.isEmpty(DB, STAGING_TABLE)).thenReturn(Boolean.TRUE);
when(this.commands.hasDropPrivilege(anyString())).thenReturn(Boolean.TRUE);
DatabaseMetaData metadata = mock(DatabaseMetaData.class);
when(this.conn.getMetaData()).thenReturn(metadata);
ResultSet rs = mock(ResultSet.class);
when(metadata.getTables(any(), anyString(), anyString(), any(String[].class))).thenReturn(rs);
when(rs.next()).thenReturn(Boolean.FALSE);
// First create attempt fails, second succeeds.
doThrow(new SQLException("boom")).doNothing()
.when(this.commands).createTableStructure(anyString(), anyString(), anyString());

this.initializer.initialize();

Assert.assertTrue(!StringUtils.isEmpty(this.workUnit.getProp(ConfigurationKeys.WRITER_STAGING_TABLE)));
// Two create attempts were made, and the failed attempt's table was dropped (cleanup) so no
// orphan is left behind. (close() is not called here, so the only drop is the cleanup.)
verify(this.commands, times(2)).createTableStructure(anyString(), anyString(), anyString());
verify(this.commands, times(1)).drop(anyString(), anyString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer.commands;

import org.testng.Assert;
import org.testng.annotations.Test;


/**
* Unit tests for {@link MySqlWriterCommands#grantCoversDrop(String, String)}, the heuristic used by
* {@code hasDropPrivilege} to decide (from {@code SHOW GRANTS} output) whether the current user can
* DROP tables in a given database. The parse is intentionally conservative; these cases pin the
* common grant shapes.
*/
@Test(groups = { "gobblin.writer" })
public class MySqlWriterCommandsTest {

private static final String DB = "gobblin";

public void detectsExplicitDropOnDatabase() {
Assert.assertTrue(MySqlWriterCommands.grantCoversDrop(
"GRANT SELECT, INSERT, CREATE, DROP ON `gobblin`.* TO `u`@`%`", DB));
}

public void detectsAllPrivilegesGlobal() {
Assert.assertTrue(MySqlWriterCommands.grantCoversDrop(
"GRANT ALL PRIVILEGES ON *.* TO `u`@`%` WITH GRANT OPTION", DB));
}

public void detectsAllPrivilegesOnDatabase() {
Assert.assertTrue(MySqlWriterCommands.grantCoversDrop(
"GRANT ALL PRIVILEGES ON `gobblin`.* TO `u`@`%`", DB));
}

public void matchesDatabaseCaseInsensitively() {
Assert.assertTrue(MySqlWriterCommands.grantCoversDrop(
"GRANT DROP ON `GOBBLIN`.* TO `u`@`%`", DB));
}

public void falseWhenNoDropPrivilege() {
Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(
"GRANT SELECT, INSERT, UPDATE, CREATE ON `gobblin`.* TO `u`@`%`", DB));
}

public void falseForUsageGrant() {
Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(
"GRANT USAGE ON *.* TO `u`@`%`", DB));
}

public void falseWhenDropIsForDifferentDatabase() {
Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(
"GRANT DROP ON `other`.* TO `u`@`%`", DB));
}

public void dropInTableNameIsNotMistakenForPrivilege() {
// DROP appears only as part of a table name (after ON), not as a granted privilege.
Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(
"GRANT SELECT ON `gobblin`.`drop_audit` TO `u`@`%`", DB));
}

public void falseForNullGrantLine() {
Assert.assertFalse(MySqlWriterCommands.grantCoversDrop(null, DB));
}
}