Skip to content
Merged
70 changes: 59 additions & 11 deletions test/src/main/java/org/apache/accumulo/test/ScanConsistencyIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,43 @@ public class ScanConsistencyIT extends AccumuloClusterHarness {

private static final Logger log = LoggerFactory.getLogger(ScanConsistencyIT.class);

/**
* Note: In order to run main,
* <ol>
* <li>Build the project</li>
* <li>Copy the accumulo test jar (in /test/target/) into your accumulo installation's lib
* directory</li>
* <li>Copy the JUnit dependencies into your accumulo installation's lib directory: mvn
* dependency:copy-dependencies -DincludeGroupIds="org.junit.jupiter" and cp
* test/target/dependency/junit-jupiter-* $ACCUMULO_HOME/lib/</li>
* <li>Ensure the test jar is in lib before the tablet servers start. Restart tablet servers if
* necessary.</li>
* <li>Run with: accumulo org.apache.accumulo.test.ScanConsistencyIT [props-file] [tmp-dir]
* [table] [sleep-time]</li>
* </ol>
*
* [props-file]: An accumulo client properties file<br>
* [tmp-dir]: tmpDir field for the TestContext object<br>
* [table]: The name of the table to be created<br>
* [sleep-time]: The time to sleep (ms) after submitting the various concurrent tasks<br>
* <br>
*
* @param args The props file, temp directory, table, and sleep time
*/
public static void main(String[] args) throws Exception {
Preconditions.checkArgument(args.length == 4, "Invalid arguments. Use: "
+ "accumulo org.apache.accumulo.test.ScanConsistencyIT [props-file] [tmp-dir] [table] [sleep-time]");
final String propsFile = args[0];
final String tmpDir = args[1];
final String table = args[2];
final long sleepTime = Long.parseLong(args[3]);

try (AccumuloClient client = Accumulo.newClient().from(propsFile).build()) {
FileSystem fileSystem = FileSystem.get(new Configuration());
runTest(client, fileSystem, tmpDir, table, sleepTime);
}
}

@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
// Sometimes a merge will run on a single tablet with an active compaction. Merge code will set
Expand All @@ -92,11 +129,21 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoo
cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "3s");
}

@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"},
justification = "predictable random is ok for testing")
@Test
public void testConcurrentScanConsistency() throws Exception {
final String table = this.getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
FileSystem fileSystem = getCluster().getFileSystem();
final String tmpDir = getCluster().getTemporaryPath().toString();
final String table = getUniqueNames(1)[0];
final long sleepTime = 60000;
runTest(client, fileSystem, tmpDir, table, sleepTime);
}
}

@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"},
justification = "predictable random is ok for testing")
private static void runTest(AccumuloClient client, FileSystem fileSystem, String tmpDir,
String table, long sleepTime) throws Exception {

/**
* Tips for debugging this test when it sees a row that should not exist or does not see a row
Expand All @@ -116,11 +163,10 @@ public void testConcurrentScanConsistency() throws Exception {
// getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);

var executor = Executors.newCachedThreadPool();
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
try {
client.tableOperations().create(table);

TestContext testContext = new TestContext(client, table, getCluster().getFileSystem(),
getCluster().getTemporaryPath().toString());
TestContext testContext = new TestContext(client, table, fileSystem, tmpDir);

List<Future<WriteStats>> writeTasks = new ArrayList<>();
List<Future<ScanStats>> scanTasks = new ArrayList<>();
Expand All @@ -141,7 +187,7 @@ public void testConcurrentScanConsistency() throws Exception {
var tableOpsTask = executor.submit(new TableOpsTask(testContext));

// let the concurrent mayhem run for a bit
Thread.sleep(60000);
Thread.sleep(sleepTime);

// let the threads know to exit
testContext.keepRunning.set(false);
Expand All @@ -168,30 +214,30 @@ public void testConcurrentScanConsistency() throws Exception {

for (Future<WriteStats> writeTask : writeTasks) {
var stats = writeTask.get();
log.debug(String.format("Wrote:%,d Bulk imported:%,d Deleted:%,d Bulk deleted:%,d",
log.info(String.format("Wrote:%,d Bulk imported:%,d Deleted:%,d Bulk deleted:%,d",
stats.written, stats.bulkImported, stats.deleted, stats.bulkDeleted));
assertTrue(stats.written + stats.bulkImported > 0);
assertTrue(stats.deleted + stats.bulkDeleted > 0);
}

for (Future<ScanStats> scanTask : scanTasks) {
var stats = scanTask.get();
log.debug(String.format("Scanned:%,d verified:%,d", stats.scanned, stats.verified));
log.info(String.format("Scanned:%,d verified:%,d", stats.scanned, stats.verified));
assertTrue(stats.verified > 0);
// These scans were running concurrently with writes, so a scan will see more data than what
// was written before the scan started.
assertTrue(stats.scanned > stats.verified);
}

log.debug(tableOpsTask.get());
log.info(tableOpsTask.get());

keepLogging.set(false);
debugTask.cancel(true);

var stats1 = scanData(testContext, random, new Range(), false);
var stats2 = scanData(testContext, random, new Range(), true);
var stats3 = batchScanData(testContext, new Range());
log.debug(
log.info(
String.format("Final scan, scanned:%,d verified:%,d", stats1.scanned, stats1.verified));
assertTrue(stats1.verified > 0);
// Should see all expected data now that there are no concurrent writes happening
Expand All @@ -200,6 +246,8 @@ public void testConcurrentScanConsistency() throws Exception {
assertEquals(stats2.verified, stats1.verified);
assertEquals(stats3.scanned, stats1.scanned);
assertEquals(stats3.verified, stats1.verified);

client.tableOperations().delete(table);
} finally {
executor.shutdownNow();
}
Expand Down