Skip to content

Commit 29bc3e5

Browse files
committed
FEATURE: Add CompletableFuture Admin APIs
1 parent 162013e commit 29bc3e5

4 files changed

Lines changed: 441 additions & 72 deletions

File tree

src/main/java/net/spy/memcached/MemcachedClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2241,7 +2241,7 @@ int getAddedQueueSize() {
22412241
*
22422242
* @return all memcached nodes from node locator
22432243
*/
2244-
protected Collection<MemcachedNode> getAllNodes() {
2244+
public Collection<MemcachedNode> getAllNodes() {
22452245
return conn.getLocator().getAll();
22462246
}
22472247

src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java

Lines changed: 212 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package net.spy.memcached.v2;
1919

20+
import java.net.SocketAddress;
2021
import java.util.AbstractMap;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
@@ -84,6 +85,7 @@
8485
import net.spy.memcached.ops.Operation;
8586
import net.spy.memcached.ops.OperationCallback;
8687
import net.spy.memcached.ops.OperationStatus;
88+
import net.spy.memcached.ops.StatsOperation;
8789
import net.spy.memcached.ops.StatusCode;
8890
import net.spy.memcached.ops.StoreType;
8991
import net.spy.memcached.transcoders.Transcoder;
@@ -612,34 +614,6 @@ public void complete() {
612614
return future;
613615
}
614616

615-
public ArcusFuture<Boolean> flush(int delay) {
616-
ArcusClient client = arcusClientSupplier.get();
617-
Collection<MemcachedNode> nodes = client.getFlushNodes();
618-
619-
Collection<CompletableFuture<?>> futures = new ArrayList<>();
620-
621-
for (MemcachedNode node : nodes) {
622-
CompletableFuture<Boolean> future = flush(client, node, delay).toCompletableFuture();
623-
futures.add(future);
624-
}
625-
626-
/*
627-
* Combine all futures. Returns true if all flush operations succeed.
628-
* Returns false if any flush operation fails.
629-
*/
630-
return new ArcusMultiFuture<>(futures, () -> {
631-
for (CompletableFuture<?> future : futures) {
632-
if (future.isCompletedExceptionally()) {
633-
return false;
634-
}
635-
Boolean result = (Boolean) future.join();
636-
if (result == null || !result) {
637-
return false;
638-
}
639-
}
640-
return true;
641-
});
642-
}
643617

644618
public ArcusFuture<Boolean> delete(String key) {
645619
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
@@ -699,41 +673,6 @@ public ArcusFuture<Map<String, Boolean>> multiDelete(List<String> keys) {
699673
});
700674
}
701675

702-
/**
703-
* Use only in flush method.
704-
*
705-
* @param client the ArcusClient instance to use
706-
* @param node the MemcachedNode to flush
707-
* @param delay flush delay
708-
* @return ArcusFuture with flush result
709-
*/
710-
private ArcusFuture<Boolean> flush(ArcusClient client, MemcachedNode node, int delay) {
711-
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
712-
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);
713-
714-
OperationCallback cb = new OperationCallback() {
715-
@Override
716-
public void receivedStatus(OperationStatus status) {
717-
if (status.getStatusCode() == StatusCode.CANCELLED) {
718-
future.internalCancel();
719-
return;
720-
}
721-
result.set(status.isSuccess());
722-
}
723-
724-
@Override
725-
public void complete() {
726-
future.complete();
727-
}
728-
};
729-
730-
Operation op = client.getOpFact().flush(delay, cb);
731-
future.setOp(op);
732-
client.addOp(node, op);
733-
734-
return future;
735-
}
736-
737676
public ArcusFuture<Boolean> bopCreate(String key, ElementValueType type,
738677
CollectionAttributes attributes) {
739678
if (attributes == null) {
@@ -1780,4 +1719,214 @@ public ArcusFuture<Boolean> sopDelete(String key, T value, boolean dropIfEmpty)
17801719
SetDelete<T> delete = new SetDelete<>(value, dropIfEmpty, false, tcForCollection);
17811720
return collectionDelete(key, delete);
17821721
}
1722+
1723+
public ArcusFuture<Boolean> flush() {
1724+
return flush(-1);
1725+
}
1726+
1727+
public ArcusFuture<Boolean> flush(int delay) {
1728+
if (delay < -1) {
1729+
throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay");
1730+
}
1731+
1732+
ArcusClient client = arcusClientSupplier.get();
1733+
Collection<MemcachedNode> nodes = client.getFlushNodes();
1734+
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());
1735+
1736+
for (MemcachedNode node : nodes) {
1737+
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
1738+
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);
1739+
1740+
OperationCallback cb = new OperationCallback() {
1741+
@Override
1742+
public void receivedStatus(OperationStatus status) {
1743+
switch (status.getStatusCode()) {
1744+
case SUCCESS:
1745+
result.set(true);
1746+
break;
1747+
case CANCELLED:
1748+
future.internalCancel();
1749+
break;
1750+
default:
1751+
result.addError(node.getSocketAddress().toString(), status);
1752+
break;
1753+
}
1754+
}
1755+
1756+
@Override
1757+
public void complete() {
1758+
future.complete();
1759+
}
1760+
};
1761+
1762+
Operation op = client.getOpFact().flush(delay, cb);
1763+
future.setOp(op);
1764+
client.addOp(node, op);
1765+
futures.add(future);
1766+
}
1767+
1768+
return new ArcusMultiFuture<>(futures, () -> {
1769+
for (CompletableFuture<?> future : futures) {
1770+
if (future.isCompletedExceptionally()) {
1771+
return false;
1772+
}
1773+
}
1774+
return true;
1775+
});
1776+
}
1777+
1778+
public ArcusFuture<Boolean> flush(String prefix) {
1779+
return flush(prefix, -1);
1780+
}
1781+
1782+
public ArcusFuture<Boolean> flush(String prefix, int delay) {
1783+
if (delay < -1) {
1784+
throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay");
1785+
}
1786+
1787+
ArcusClient client = arcusClientSupplier.get();
1788+
Collection<MemcachedNode> nodes = client.getFlushNodes();
1789+
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());
1790+
1791+
for (MemcachedNode node : nodes) {
1792+
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
1793+
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);
1794+
1795+
OperationCallback cb = new OperationCallback() {
1796+
@Override
1797+
public void receivedStatus(OperationStatus status) {
1798+
switch (status.getStatusCode()) {
1799+
case SUCCESS:
1800+
result.set(true);
1801+
break;
1802+
case ERR_NOT_FOUND:
1803+
result.set(false);
1804+
break;
1805+
case CANCELLED:
1806+
future.internalCancel();
1807+
break;
1808+
default:
1809+
result.addError(node.getSocketAddress().toString(), status);
1810+
break;
1811+
}
1812+
}
1813+
1814+
@Override
1815+
public void complete() {
1816+
future.complete();
1817+
}
1818+
};
1819+
1820+
Operation op = client.getOpFact().flush(prefix, delay, false, cb);
1821+
future.setOp(op);
1822+
client.addOp(node, op);
1823+
futures.add(future);
1824+
}
1825+
1826+
return new ArcusMultiFuture<>(futures, () -> {
1827+
for (CompletableFuture<?> future : futures) {
1828+
if (future.isCompletedExceptionally()) {
1829+
return false;
1830+
}
1831+
1832+
if (Boolean.TRUE.equals(future.join())) {
1833+
return true;
1834+
}
1835+
}
1836+
return false;
1837+
});
1838+
}
1839+
1840+
public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats() {
1841+
return stats(null);
1842+
}
1843+
1844+
public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats(String arg) {
1845+
ArcusClient client = arcusClientSupplier.get();
1846+
Collection<MemcachedNode> nodes = client.getAllNodes();
1847+
1848+
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());
1849+
Map<SocketAddress, CompletableFuture<Map<String, String>>> addressToFuture
1850+
= new HashMap<>(nodes.size());
1851+
1852+
for (MemcachedNode node : nodes) {
1853+
SocketAddress address = node.getSocketAddress();
1854+
AbstractArcusResult<Map<String, String>> result
1855+
= new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>()));
1856+
ArcusFutureImpl<Map<String, String>> future = new ArcusFutureImpl<>(result);
1857+
1858+
StatsOperation.Callback cb = new StatsOperation.Callback() {
1859+
@Override
1860+
public void gotStat(String name, String val) {
1861+
result.get().put(name, val);
1862+
}
1863+
1864+
@Override
1865+
public void receivedStatus(OperationStatus status) {
1866+
if (status.getStatusCode() == StatusCode.CANCELLED) {
1867+
future.internalCancel();
1868+
}
1869+
}
1870+
1871+
@Override
1872+
public void complete() {
1873+
future.complete();
1874+
}
1875+
};
1876+
Operation op = client.getOpFact().stats(arg, cb);
1877+
future.setOp(op);
1878+
client.addOp(node, op);
1879+
1880+
futures.add(future);
1881+
addressToFuture.put(address, future);
1882+
}
1883+
1884+
return new ArcusMultiFuture<>(futures, () -> {
1885+
Map<SocketAddress, Map<String, String>> resultMap = new HashMap<>(addressToFuture.size());
1886+
addressToFuture.forEach((address, future) -> resultMap.put(address, future.join()));
1887+
return resultMap;
1888+
});
1889+
}
1890+
1891+
public ArcusFuture<Map<SocketAddress, String>> versions() {
1892+
ArcusClient client = arcusClientSupplier.get();
1893+
Collection<MemcachedNode> nodes = client.getAllNodes();
1894+
1895+
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());
1896+
Map<SocketAddress, CompletableFuture<String>> addressToFuture = new HashMap<>(nodes.size());
1897+
1898+
for (MemcachedNode node : nodes) {
1899+
SocketAddress address = node.getSocketAddress();
1900+
AbstractArcusResult<String> result = new AbstractArcusResult<>(new AtomicReference<>());
1901+
ArcusFutureImpl<String> future = new ArcusFutureImpl<>(result);
1902+
1903+
OperationCallback cb = new OperationCallback() {
1904+
@Override
1905+
public void receivedStatus(OperationStatus status) {
1906+
if (status.getStatusCode() == StatusCode.CANCELLED) {
1907+
future.internalCancel();
1908+
return;
1909+
}
1910+
result.set(status.getMessage());
1911+
}
1912+
1913+
@Override
1914+
public void complete() {
1915+
future.complete();
1916+
}
1917+
};
1918+
Operation op = client.getOpFact().version(cb);
1919+
future.setOp(op);
1920+
client.addOp(node, op);
1921+
1922+
futures.add(future);
1923+
addressToFuture.put(address, future);
1924+
}
1925+
1926+
return new ArcusMultiFuture<>(futures, () -> {
1927+
Map<SocketAddress, String> resultMap = new HashMap<>(addressToFuture.size());
1928+
addressToFuture.forEach((address, future) -> resultMap.put(address, future.join()));
1929+
return resultMap;
1930+
});
1931+
}
17831932
}

0 commit comments

Comments
 (0)