diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java index 74f89e806ac..7567bd7341e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java @@ -82,6 +82,21 @@ public interface PropStore { void replaceAll(PropStoreKey propStoreKey, long version, Map props) throws ConcurrentModificationException; + /** + * Replaces all current properties with map provided. If a property is not included in the new + * map, the property will not be set. + * + * @param propStoreKey the prop cache key + * @param expected expected current properties + * @param props a map of property k,v pairs + * @throws IllegalStateException if the values cannot be written or if an underlying store + * exception occurs. + * @throws java.util.ConcurrentModificationException if the properties currently in the store do + * not match the expected props passed. No changes are made when this happens. + */ + void replaceAll(PropStoreKey propStoreKey, Map expected, Map props) + throws ConcurrentModificationException; + /** * Unconditionally replaces all properties with the map provided. If a property is not included in * the new map, the property will not be set. diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java index cc47d0b64ad..630466c23a9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java @@ -217,6 +217,21 @@ public void replaceAll(@NonNull PropStoreKey propStoreKey, @NonNull Map expected, + @NonNull Map props) { + BiFunction,VersionedProperties> action = + (current, p) -> { + if (!current.asMap().equals(expected)) { + throw new ConcurrentModificationException( + "Current properties do not match expected, key:" + propStoreKey); + } + return current.replaceAll(p); + }; + + mutateVersionedProps(propStoreKey, action, props); + } + @Override public void replaceAll(@NonNull PropStoreKey propStoreKey, long version, @NonNull Map props) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ImportConfigCommand.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ImportConfigCommand.java index 5c926cf761c..60b1a199617 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/ImportConfigCommand.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/ImportConfigCommand.java @@ -18,8 +18,15 @@ */ package org.apache.accumulo.server.conf.util; +import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,6 +51,7 @@ import org.apache.accumulo.start.spi.KeywordExecutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; import com.beust.jcommander.JCommander; @@ -77,6 +85,12 @@ public String description() { } public static class Opts extends ServerOpts { + @Parameter(names = "--input", + description = "Yaml file containing configuration data. If not specified will read from stdin.") + public String inputFile; + @Parameter(names = "--expected", + description = "Yaml file containing expected current config. Changes are only made if config in zookeeper matches whats in this file.") + public String expectedFile; @Parameter(names = "--ignore-extra", description = "Proceed when Accumulo has extra tables, resource groups, or namespaces that are not in yaml") public boolean ignoreExtra = false; @@ -99,6 +113,9 @@ static PropStoreKey getKey(Scope scope, String name, ServerContext context) { } record ScopeName(Scope scope, String name) { + public ScopeName(ScopedProperties sp) { + this(sp.scope(), sp.name()); + } } private static Set getAllScopeNames(ServerContext context) { @@ -124,9 +141,15 @@ private static Set getAllScopeNames(ServerContext context) { } private static void validate(ServerContext serverContext, List allProps, - boolean ignoreExtra) { + Map expectedProps, boolean ignoreExtra, + boolean precheckExpected) { var scopeNamesInYaml = new HashSet(); - allProps.forEach(sp -> scopeNamesInYaml.add(new ScopeName(sp.scope(), sp.name()))); + allProps.forEach(sp -> { + if (!scopeNamesInYaml.add(new ScopeName(sp))) { + throw new IllegalArgumentException( + "Duplicate scope+name in input, scope:" + sp.scope() + " name:" + sp.name()); + } + }); var scopeNamesInAccumulo = getAllScopeNames(serverContext); if (!scopeNamesInYaml.equals(scopeNamesInAccumulo)) { @@ -149,29 +172,111 @@ private static void validate(ServerContext serverContext, List } } + if (expectedProps != null) { + for (var scopedProps : allProps) { + var key = new ScopeName(scopedProps.scope(), scopedProps.name()); + if (!expectedProps.containsKey(key)) { + throw new IllegalArgumentException( + "Scope+name present in input but not present in expected file, scope:" + key.scope() + + " name:" + key.name()); + } + } + } + // validate all scope+name before attempting to update any scope+name + var propStore = serverContext.getPropStore(); for (var scopedProps : allProps) { var propStoreKey = getKey(scopedProps.scope(), scopedProps.name(), serverContext); PropUtil.validateProperties(serverContext, propStoreKey, scopedProps.props()); + // precheckExpected is only used in testing, it allows test code to bypass this code and + // exercise the expected check done in the atomic zookeeper update. + if (expectedProps != null && precheckExpected) { + // This check serves two purposes. First it runs during dry-run. Second it avoids changing + // anything when a subset does not match the expected. Changes could be made after this + // check and before the update. If this happens the update still fail because another check + // is done in the atomic zookeeper update, however that could lead to a subset that match + // the expected being updated. + if (!propStore.get(propStoreKey).asMap() + .equals(expectedProps.get(new ScopeName(scopedProps)).props())) { + throw new ConcurrentModificationException( + createUnexpectedMessage(scopedProps.scope(), scopedProps.name())); + } + } + } + } + + private static List read(Yaml yaml, String file, InputStream in) { + List allProps = new ArrayList<>(); + if (file != null) { + try (var fileIn = new BufferedInputStream(Files.newInputStream(Path.of(file)))) { + for (var obj : yaml.loadAll(fileIn)) { + allProps.add(new ScopedProperties((Map) obj)); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + for (var obj : yaml.loadAll(in)) { + allProps.add(new ScopedProperties((Map) obj)); + } } + + return allProps; } @VisibleForTesting public static void load(ServerContext serverContext, InputStream in, Opts options) { - Yaml yaml = new Yaml(); - List allProps = new ArrayList<>(); - for (var obj : yaml.loadAll(in)) { - allProps.add(new ScopedProperties((Map) obj)); + load(serverContext, in, options, true); + } + + private static String createUnexpectedMessage(Scope scope, String name) { + return "Properties in scope:" + scope + " name:" + name + + " do not match the expected values. To diagnose, export current config to a new file and diff with expected file."; + } + + @VisibleForTesting + public static void load(ServerContext serverContext, InputStream in, Opts options, + boolean precheckExpected) { + var loaderOpts = new LoaderOptions(); + loaderOpts.setAllowDuplicateKeys(false); + Yaml yaml = new Yaml(loaderOpts); + List allProps = read(yaml, options.inputFile, in); + + Map expectedProps; + if (options.expectedFile == null) { + expectedProps = null; + } else { + var grouped = new HashMap(); + for (var sp : read(yaml, options.expectedFile, null)) { + var key = new ScopeName(sp); + if (grouped.put(key, sp) != null) { + throw new IllegalArgumentException( + "Duplicate scope+name in expected file, scope:" + sp.scope() + " name:" + sp.name()); + } + } + expectedProps = grouped; } - validate(serverContext, allProps, options.ignoreExtra); + validate(serverContext, allProps, expectedProps, options.ignoreExtra, precheckExpected); if (!options.dryRun) { var propStore = serverContext.getPropStore(); for (var sp : allProps) { var propStoreKey = getKey(sp.scope(), sp.name(), serverContext); - propStore.replaceAll(propStoreKey, sp.props()); + if (expectedProps == null) { + // Unconditionally replace properties + propStore.replaceAll(propStoreKey, sp.props()); + } else { + try { + // Only replace properties if they match the expected values + propStore.replaceAll(propStoreKey, expectedProps.get(new ScopeName(sp)).props(), + sp.props()); + } catch (ConcurrentModificationException cme) { + throw new ConcurrentModificationException( + createUnexpectedMessage(sp.scope(), sp.name()), cme); + } + } } } } diff --git a/test/pom.xml b/test/pom.xml index a3c2a44bdc0..7fe7654b783 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -231,6 +231,10 @@ org.slf4j slf4j-api + + org.yaml + snakeyaml + io.opentelemetry.javaagent opentelemetry-javaagent diff --git a/test/src/main/java/org/apache/accumulo/test/conf/ImportExportConfigIT.java b/test/src/main/java/org/apache/accumulo/test/conf/ImportExportConfigIT.java index 13a0966ca76..fd11d515826 100644 --- a/test/src/main/java/org/apache/accumulo/test/conf/ImportExportConfigIT.java +++ b/test/src/main/java/org/apache/accumulo/test/conf/ImportExportConfigIT.java @@ -21,11 +21,18 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -45,11 +52,20 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.conf.util.ExportConfigCommand; import org.apache.accumulo.server.conf.util.ImportConfigCommand; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.yaml.snakeyaml.constructor.DuplicateKeyException; public class ImportExportConfigIT extends AccumuloClusterHarness { + + @TempDir + private static Path tempDir; + private static final String YAML1 = """ scope: SYSTEM @@ -765,6 +781,242 @@ public void testInvalid() throws Exception { } + /** + * Test when yaml has duplicate scope+name, should fail. + */ + @Test + public void testDuplicate() throws Exception { + + var template = """ + scope: __SCOPE__ + name: dup + properties: { + table.split.threshold: 5 + } + --- + scope: __SCOPE__ + name: dup + properties: { + table.split.threshold: 7 + } + """; + + var correctTemplate = """ + scope: __SCOPE__ + name: dup + properties: { + table.split.threshold: 5 + } + """; + + for (var scope : ExportConfigCommand.Scope.values()) { + var yaml = template.replace("__SCOPE__", scope.name()); + var iae = assertThrows(IllegalArgumentException.class, + () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream(yaml.getBytes(UTF_8)), DRY_RUN_IGNORE_EXTRA_OPTS)); + assertEquals("Duplicate scope+name in input, scope:" + scope.name() + " name:dup", + iae.getMessage()); + + var opts = new ImportConfigCommand.Opts(); + opts.expectedFile = write(yaml); + opts.dryRun = true; + opts.ignoreExtra = true; + var correctYaml = correctTemplate.replace("__SCOPE__", scope.name()); + iae = assertThrows(IllegalArgumentException.class, () -> ImportConfigCommand + .load(getServerContext(), new ByteArrayInputStream(correctYaml.getBytes(UTF_8)), opts)); + assertEquals("Duplicate scope+name in expected file, scope:" + scope.name() + " name:dup", + iae.getMessage()); + + } + + // Test snake yaml settings dissallow duplicate map keys + var dupMapKeys = """ + scope: SYSTEM + scope: RESOURCE_GROUP + name: '' + name: "rgid1" + properties: { + } + """; + var opts = new ImportConfigCommand.Opts(); + opts.expectedFile = write(dupMapKeys); + opts.dryRun = true; + opts.ignoreExtra = true; + assertThrows(DuplicateKeyException.class, () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream(dupMapKeys.getBytes(UTF_8)), opts)); + + } + + private String write(String yaml) throws IOException { + var expectedFile = Files.createTempFile(tempDir, "iet", "yaml"); + try (var out = Files.newBufferedWriter(expectedFile, StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING)) { + out.write(yaml); + } + return expectedFile.toAbsolutePath().toString(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExpected(boolean precheckExpected) throws Exception { + + var opts = new ImportConfigCommand.Opts(); + opts.ignoreExtra = true; + + try (var client = Accumulo.newClient().from(getClientProps()).build()) { + + assertEquals(Map.of(), client.instanceOperations().getSystemProperties()); + + var expectedSystem = """ + scope: SYSTEM + name: '' + properties: { + } + """; + var updateSystem1 = """ + scope: SYSTEM + name: '' + properties: { + general.server.threadpool.size: 5 + } + """; + var updateSystem2 = """ + scope: SYSTEM + name: '' + properties: { + general.micrometer.log.metrics: log4j2, + general.micrometer.enabled: true + } + """; + assertEquals(Map.of(), client.instanceOperations().getSystemProperties()); + opts.expectedFile = write(expectedSystem); + opts.inputFile = write(updateSystem1); + ImportConfigCommand.load(getServerContext(), new ByteArrayInputStream("".getBytes(UTF_8)), + opts, precheckExpected); + Wait.waitFor(() -> Map.of("general.server.threadpool.size", "5") + .equals(client.instanceOperations().getSystemProperties())); + // running the command again should fail because the expected is not correct + opts.expectedFile = write(expectedSystem); + opts.inputFile = write(updateSystem2); + var cme = assertThrows(ConcurrentModificationException.class, + () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream("".getBytes(UTF_8)), opts, precheckExpected)); + assertEquals( + "Properties in scope:SYSTEM name: do not match the expected values. To diagnose, export current config to a new file and diff with expected file.", + cme.getMessage()); + if (precheckExpected) { + assertNull(cme.getCause()); + } else { + // when a failure happens in the zookeeper atomic update will have an exception wrapping an + // exception + assertNotNull(cme.getCause()); + assertEquals(ConcurrentModificationException.class, cme.getCause().getClass()); + } + + // test dry run with expected + opts.dryRun = true; + if (precheckExpected) { + cme = assertThrows(ConcurrentModificationException.class, + () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream("".getBytes(UTF_8)), opts, precheckExpected)); + assertEquals( + "Properties in scope:SYSTEM name: do not match the expected values. To diagnose, export current config to a new file and diff with expected file.", + cme.getMessage()); + } else { + // should not fail and should not change anything, this is testing the test code. + ImportConfigCommand.load(getServerContext(), new ByteArrayInputStream("".getBytes(UTF_8)), + opts, precheckExpected); + } + opts.dryRun = false; + + // The properties should not have changed, give any changes that may have been erroneously + // made time to propagate + Thread.sleep(100); + assertEquals(Map.of("general.server.threadpool.size", "5"), + client.instanceOperations().getSystemProperties()); + + // running the import again should succeed w/ the new expected file + opts.expectedFile = write(updateSystem1); + opts.inputFile = write(updateSystem2); + ImportConfigCommand.load(getServerContext(), new ByteArrayInputStream("".getBytes(UTF_8)), + opts, precheckExpected); + Wait.waitFor(() -> Map + .of("general.micrometer.log.metrics", "log4j2", "general.micrometer.enabled", "true") + .equals(client.instanceOperations().getSystemProperties())); + + // test resource groups, namespaces and tables + ResourceGroupId rgid1 = ResourceGroupId.of("expectedRG"); + client.resourceGroupOperations().create(rgid1); + assertEquals(Map.of(), client.resourceGroupOperations().getProperties(rgid1)); + client.resourceGroupOperations().setProperty(rgid1, "general.server.threadpool.size", + "987654321"); + client.resourceGroupOperations().setProperty(rgid1, "general.micrometer.enabled", "false"); + client.namespaceOperations().create("expns"); + client.namespaceOperations().setProperty("expns", "table.file.max", "50"); + client.tableOperations().create("expns.t1", new NewTableConfiguration().withoutDefaults()); + client.tableOperations().setProperty("expns.t1", "table.split.threshold", "100M"); + var exportYaml = ExportConfigCommand.export(getServerContext()); + client.resourceGroupOperations().setProperty(rgid1, "general.micrometer.enabled", "true"); + opts.expectedFile = write(exportYaml); + opts.inputFile = write(exportYaml.replace("9", "10")); + cme = assertThrows(ConcurrentModificationException.class, + () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream("".getBytes(UTF_8)), opts, precheckExpected)); + assertEquals( + "Properties in scope:RESOURCE_GROUP name:expectedRG do not match the expected values. To diagnose, export current config to a new file and diff with expected file.", + cme.getMessage()); + // The properties should not have changed, give any changes that may have been erroneously + // made time to propagate + Thread.sleep(100); + assertEquals(Map.of("general.server.threadpool.size", "987654321", + "general.micrometer.enabled", "true"), + client.resourceGroupOperations().getProperties(rgid1)); + // correct the expected file to work and run again + client.resourceGroupOperations().setProperty(rgid1, "general.micrometer.enabled", "false"); + opts.expectedFile = write(exportYaml); + opts.inputFile = write(exportYaml.replace("987654321", "10")); + ImportConfigCommand.load(getServerContext(), new ByteArrayInputStream("".getBytes(UTF_8)), + opts, precheckExpected); + Wait.waitFor(() -> Map + .of("general.server.threadpool.size", "10", "general.micrometer.enabled", "false") + .equals(client.resourceGroupOperations().getProperties(rgid1))); + + // check table scope + exportYaml = ExportConfigCommand.export(getServerContext()); + client.tableOperations().setProperty("expns.t1", "table.split.threshold", "200M"); + opts.expectedFile = write(exportYaml); + opts.inputFile = write(exportYaml.replace("100M", "600M")); + cme = assertThrows(ConcurrentModificationException.class, + () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream("".getBytes(UTF_8)), opts, precheckExpected)); + assertEquals( + "Properties in scope:TABLE name:expns.t1 do not match the expected values. To diagnose, export current config to a new file and diff with expected file.", + cme.getMessage()); + assertEquals(Map.of("table.split.threshold", "200M"), + client.tableOperations().getTableProperties("expns.t1")); + // correct expected file + opts.expectedFile = write(exportYaml.replace("100M", "200M")); + ImportConfigCommand.load(getServerContext(), new ByteArrayInputStream("".getBytes(UTF_8)), + opts, precheckExpected); + Wait.waitFor(() -> Map.of("table.split.threshold", "600M") + .equals(client.tableOperations().getTableProperties("expns.t1"))); + + // check case where expected file is missing a section + exportYaml = ExportConfigCommand.export(getServerContext()); + client.tableOperations().create("expns.t2", new NewTableConfiguration().withoutDefaults()); + // the export yaml does not contain the new table + var exportYaml2 = ExportConfigCommand.export(getServerContext()); + opts.expectedFile = write(exportYaml); + opts.inputFile = write(exportYaml2.replace("600M", "123M")); + var iae = assertThrows(IllegalArgumentException.class, + () -> ImportConfigCommand.load(getServerContext(), + new ByteArrayInputStream("".getBytes(UTF_8)), opts, precheckExpected)); + assertEquals( + "Scope+name present in input but not present in expected file, scope:TABLE name:expns.t2", + iae.getMessage()); + } + } + @Test public void testOffline() throws Exception { // Ensures import/export can work when no accumulo server processes are running