diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 8c40914e3..8ba9466cc 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -2241,7 +2241,7 @@ int getAddedQueueSize() { * * @return all memcached nodes from node locator */ - protected Collection getAllNodes() { + public Collection getAllNodes() { return conn.getLocator().getAll(); } diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index 88cb68359..ca9a47180 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -17,6 +17,7 @@ */ package net.spy.memcached.v2; +import java.net.SocketAddress; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; @@ -84,6 +85,7 @@ import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.StatsOperation; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.ops.StoreType; import net.spy.memcached.transcoders.Transcoder; @@ -612,34 +614,6 @@ public void complete() { return future; } - public ArcusFuture flush(int delay) { - ArcusClient client = arcusClientSupplier.get(); - Collection nodes = client.getFlushNodes(); - - Collection> futures = new ArrayList<>(); - - for (MemcachedNode node : nodes) { - CompletableFuture future = flush(client, node, delay).toCompletableFuture(); - futures.add(future); - } - - /* - * Combine all futures. Returns true if all flush operations succeed. - * Returns false if any flush operation fails. - */ - return new ArcusMultiFuture<>(futures, () -> { - for (CompletableFuture future : futures) { - if (future.isCompletedExceptionally()) { - return false; - } - Boolean result = (Boolean) future.join(); - if (result == null || !result) { - return false; - } - } - return true; - }); - } public ArcusFuture delete(String key) { AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); @@ -699,41 +673,6 @@ public ArcusFuture> multiDelete(List keys) { }); } - /** - * Use only in flush method. - * - * @param client the ArcusClient instance to use - * @param node the MemcachedNode to flush - * @param delay flush delay - * @return ArcusFuture with flush result - */ - private ArcusFuture flush(ArcusClient client, MemcachedNode node, int delay) { - AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); - ArcusFutureImpl future = new ArcusFutureImpl<>(result); - - OperationCallback cb = new OperationCallback() { - @Override - public void receivedStatus(OperationStatus status) { - if (status.getStatusCode() == StatusCode.CANCELLED) { - future.internalCancel(); - return; - } - result.set(status.isSuccess()); - } - - @Override - public void complete() { - future.complete(); - } - }; - - Operation op = client.getOpFact().flush(delay, cb); - future.setOp(op); - client.addOp(node, op); - - return future; - } - public ArcusFuture bopCreate(String key, ElementValueType type, CollectionAttributes attributes) { if (attributes == null) { @@ -1780,4 +1719,214 @@ public ArcusFuture sopDelete(String key, T value, boolean dropIfEmpty) SetDelete delete = new SetDelete<>(value, dropIfEmpty, false, tcForCollection); return collectionDelete(key, delete); } + + public ArcusFuture flush() { + return flush(-1); + } + + public ArcusFuture flush(int delay) { + if (delay < -1) { + throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay"); + } + + ArcusClient client = arcusClientSupplier.get(); + Collection nodes = client.getFlushNodes(); + Collection> futures = new ArrayList<>(nodes.size()); + + for (MemcachedNode node : nodes) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + switch (status.getStatusCode()) { + case SUCCESS: + result.set(true); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + result.addError(node.getSocketAddress().toString(), status); + break; + } + } + + @Override + public void complete() { + future.complete(); + } + }; + + Operation op = client.getOpFact().flush(delay, cb); + future.setOp(op); + client.addOp(node, op); + futures.add(future); + } + + return new ArcusMultiFuture<>(futures, () -> { + for (CompletableFuture future : futures) { + if (future.isCompletedExceptionally()) { + return false; + } + } + return true; + }); + } + + public ArcusFuture flush(String prefix) { + return flush(prefix, -1); + } + + public ArcusFuture flush(String prefix, int delay) { + if (delay < -1) { + throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay"); + } + + ArcusClient client = arcusClientSupplier.get(); + Collection nodes = client.getFlushNodes(); + Collection> futures = new ArrayList<>(nodes.size()); + + for (MemcachedNode node : nodes) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + switch (status.getStatusCode()) { + case SUCCESS: + result.set(true); + break; + case ERR_NOT_FOUND: + result.set(false); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + result.addError(node.getSocketAddress().toString(), status); + break; + } + } + + @Override + public void complete() { + future.complete(); + } + }; + + Operation op = client.getOpFact().flush(prefix, delay, false, cb); + future.setOp(op); + client.addOp(node, op); + futures.add(future); + } + + return new ArcusMultiFuture<>(futures, () -> { + for (CompletableFuture future : futures) { + if (future.isCompletedExceptionally()) { + return false; + } + + if (Boolean.TRUE.equals(future.join())) { + return true; + } + } + return false; + }); + } + + public ArcusFuture>> stats() { + return stats(null); + } + + public ArcusFuture>> stats(String arg) { + ArcusClient client = arcusClientSupplier.get(); + Collection nodes = client.getAllNodes(); + + Collection> futures = new ArrayList<>(nodes.size()); + Map>> addressToFuture + = new HashMap<>(nodes.size()); + + for (MemcachedNode node : nodes) { + SocketAddress address = node.getSocketAddress(); + AbstractArcusResult> result + = new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>())); + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + + StatsOperation.Callback cb = new StatsOperation.Callback() { + @Override + public void gotStat(String name, String val) { + result.get().put(name, val); + } + + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + } + } + + @Override + public void complete() { + future.complete(); + } + }; + Operation op = client.getOpFact().stats(arg, cb); + future.setOp(op); + client.addOp(node, op); + + futures.add(future); + addressToFuture.put(address, future); + } + + return new ArcusMultiFuture<>(futures, () -> { + Map> resultMap = new HashMap<>(addressToFuture.size()); + addressToFuture.forEach((address, future) -> resultMap.put(address, future.join())); + return resultMap; + }); + } + + public ArcusFuture> versions() { + ArcusClient client = arcusClientSupplier.get(); + Collection nodes = client.getAllNodes(); + + Collection> futures = new ArrayList<>(nodes.size()); + Map> addressToFuture = new HashMap<>(nodes.size()); + + for (MemcachedNode node : nodes) { + SocketAddress address = node.getSocketAddress(); + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + return; + } + result.set(status.getMessage()); + } + + @Override + public void complete() { + future.complete(); + } + }; + Operation op = client.getOpFact().version(cb); + future.setOp(op); + client.addOp(node, op); + + futures.add(future); + addressToFuture.put(address, future); + } + + return new ArcusMultiFuture<>(futures, () -> { + Map resultMap = new HashMap<>(addressToFuture.size()); + addressToFuture.forEach((address, future) -> resultMap.put(address, future.join())); + return resultMap; + }); + } } diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index 602854f14..5a017a826 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -17,6 +17,7 @@ */ package net.spy.memcached.v2; +import java.net.SocketAddress; import java.util.List; import java.util.Map; import java.util.Set; @@ -219,14 +220,6 @@ public interface AsyncArcusCommandsIF { */ ArcusFuture> multiDelete(List keys); - /** - * Flush all items from all servers. - * - * @param delay delay in seconds before flushing - * @return {@code Boolean.True} if flushed successfully, otherwise {@code Boolean.False} - */ - ArcusFuture flush(int delay); - /** * Create a btree item. * @@ -620,4 +613,60 @@ ArcusFuture sopCreate(String key, ElementValueType type, * {@code null} if the key is not found. */ ArcusFuture sopDelete(String key, T value, boolean dropIfEmpty); + + /** + * Flush all items from all servers immediately. + * + * @return {@code true} if all servers flushed successfully, {@code false} otherwise + */ + ArcusFuture flush(); + + /** + * Flush all items from all servers after a given delay. + * + * @param delay delay in seconds before flushing. (≥ -1) + * @return {@code true} if all servers flushed successfully, {@code false} otherwise + */ + ArcusFuture flush(int delay); + + /** + * Flush all items with the given prefix from all servers immediately. + * + * @param prefix the prefix of the items to flush + * @return {@code true} if flushed successfully, + * {@code false} if no items with the given prefix exist + */ + ArcusFuture flush(String prefix); + + /** + * Flush all items with the given prefix from all servers after a given delay. + * + * @param prefix the prefix of the items to flush + * @param delay delay in seconds before flushing. (≥ -1) + * @return {@code true} if flushed successfully, + * {@code false} if no items with the given prefix exist + */ + ArcusFuture flush(String prefix, int delay); + + /** + * Get statistics from all connected servers. + * + * @return a map of each server's {@link java.net.SocketAddress} to its stats key-value pairs + */ + ArcusFuture>> stats(); + + /** + * Get a specific set of statistics from all connected servers. + * + * @param arg the stats argument (e.g. {@code "settings"}, {@code "slabs"}, {@code "zookeeper"}) + * @return a map of each server's {@link java.net.SocketAddress} to its stats key-value pairs + */ + ArcusFuture>> stats(String arg); + + /** + * Get the version string from all connected servers. + * + * @return a map of each server's {@link java.net.SocketAddress} to its version string + */ + ArcusFuture> versions(); } diff --git a/src/test/java/net/spy/memcached/v2/AdminAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/AdminAsyncArcusCommandsTest.java new file mode 100644 index 000000000..2bad66b5e --- /dev/null +++ b/src/test/java/net/spy/memcached/v2/AdminAsyncArcusCommandsTest.java @@ -0,0 +1,171 @@ +package net.spy.memcached.v2; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class AdminAsyncArcusCommandsTest extends AsyncArcusCommandsTest { + + @Test + void flush() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + async.set(key, 0, VALUE) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + + // when + async.flush() + // then + .thenCompose(result -> { + assertTrue(result); + return async.get(key); + }) + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void flushWithDelay() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = keys.get(0); + async.set(key, 0, VALUE) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + + // when + async.flush(0) + // then + .thenCompose(result -> { + assertTrue(result); + return async.get(key); + }) + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void flushWithPrefix() throws ExecutionException, InterruptedException, TimeoutException { + // given + String keyA = "prefixA:key"; + String keyB = "prefixB:key"; + async.set(keyA, 0, VALUE) + .thenCompose(result -> async.set(keyB, 0, VALUE)) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + + // when + async.flush("prefixA") + // then + .thenCompose(result -> { + assertTrue(result); + return async.get(keyA); + }) + .thenCompose(result -> { + assertNull(result); + return async.get(keyB); + }) + .thenAccept(result -> { + assertNotNull(result); + assertEquals(VALUE, result); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void flushWithPrefixNotFound() throws ExecutionException, InterruptedException, TimeoutException { + // given + // when + async.flush("nonExistentPrefix") + // then + .thenAccept(Assertions::assertFalse) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void flushWithPrefixAndDelay() throws ExecutionException, InterruptedException, TimeoutException { + // given + String key = "prefixA:key"; + async.set(key, 0, VALUE) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + + // when + async.flush("prefixA", 0) + // then + .thenCompose(result -> { + assertTrue(result); + return async.get(key); + }) + .thenAccept(Assertions::assertNull) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void stats() throws ExecutionException, InterruptedException, TimeoutException { + // given + // when + async.stats() + // then + .thenAccept(result -> { + assertFalse(result.isEmpty()); + result.forEach((address, stat) -> { + assertNotNull(address); + assertTrue(stat.containsKey("total_items")); + }); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void statsWithArg() throws ExecutionException, InterruptedException, TimeoutException { + // given + String arg = "zookeeper"; + + // when + async.stats(arg) + .thenAccept(result -> { + assertFalse(result.isEmpty()); + result.forEach(((socketAddress, stats) -> { + assertNotNull(socketAddress); + assertTrue(stats.containsKey("zk_connected")); + })); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void versions() throws ExecutionException, InterruptedException, TimeoutException { + // given + // when + async.versions() + // then + .thenAccept(result -> { + assertFalse(result.isEmpty()); + result.forEach((socketAddress, version) -> { + assertNotNull(socketAddress); + assertNotNull(version); + assertFalse(version.isEmpty()); + }); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + +}