-
Notifications
You must be signed in to change notification settings - Fork 50
FEATURE: Add CompletableFuture Admin APIs #1073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | |||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,7 @@ | ||||||||||||||||||||||
| */ | |||||||||||||||||||||||
| package net.spy.memcached.v2; | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| import java.net.SocketAddress; | |||||||||||||||||||||||
| import java.util.AbstractMap; | |||||||||||||||||||||||
| import java.util.ArrayList; | |||||||||||||||||||||||
| import java.util.Collection; | |||||||||||||||||||||||
|
|
@@ -84,6 +85,7 @@ | ||||||||||||||||||||||
| import net.spy.memcached.ops.Operation; | |||||||||||||||||||||||
| import net.spy.memcached.ops.OperationCallback; | |||||||||||||||||||||||
| import net.spy.memcached.ops.OperationStatus; | |||||||||||||||||||||||
| import net.spy.memcached.ops.StatsOperation; | |||||||||||||||||||||||
| import net.spy.memcached.ops.StatusCode; | |||||||||||||||||||||||
| import net.spy.memcached.ops.StoreType; | |||||||||||||||||||||||
| import net.spy.memcached.transcoders.Transcoder; | |||||||||||||||||||||||
|
|
@@ -612,34 +614,6 @@ public void complete() { | ||||||||||||||||||||||
| return future; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> flush(int delay) { | |||||||||||||||||||||||
| ArcusClient client = arcusClientSupplier.get(); | |||||||||||||||||||||||
| Collection<MemcachedNode> nodes = client.getFlushNodes(); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| Collection<CompletableFuture<?>> futures = new ArrayList<>(); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| for (MemcachedNode node : nodes) { | |||||||||||||||||||||||
| CompletableFuture<Boolean> future = flush(client, node, delay).toCompletableFuture(); | |||||||||||||||||||||||
| futures.add(future); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| /* | |||||||||||||||||||||||
| * Combine all futures. Returns true if all flush operations succeed. | |||||||||||||||||||||||
| * Returns false if any flush operation fails. | |||||||||||||||||||||||
| */ | |||||||||||||||||||||||
| return new ArcusMultiFuture<>(futures, () -> { | |||||||||||||||||||||||
| for (CompletableFuture<?> future : futures) { | |||||||||||||||||||||||
| if (future.isCompletedExceptionally()) { | |||||||||||||||||||||||
| return false; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| Boolean result = (Boolean) future.join(); | |||||||||||||||||||||||
| if (result == null || !result) { | |||||||||||||||||||||||
| return false; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| return true; | |||||||||||||||||||||||
| }); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> delete(String key) { | |||||||||||||||||||||||
| AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>()); | |||||||||||||||||||||||
|
|
@@ -699,41 +673,6 @@ public ArcusFuture<Map<String, Boolean>> multiDelete(List<String> keys) { | ||||||||||||||||||||||
| }); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| /** | |||||||||||||||||||||||
| * Use only in flush method. | |||||||||||||||||||||||
| * | |||||||||||||||||||||||
| * @param client the ArcusClient instance to use | |||||||||||||||||||||||
| * @param node the MemcachedNode to flush | |||||||||||||||||||||||
| * @param delay flush delay | |||||||||||||||||||||||
| * @return ArcusFuture with flush result | |||||||||||||||||||||||
| */ | |||||||||||||||||||||||
| private ArcusFuture<Boolean> flush(ArcusClient client, MemcachedNode node, int delay) { | |||||||||||||||||||||||
| AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>()); | |||||||||||||||||||||||
| ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| OperationCallback cb = new OperationCallback() { | |||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void receivedStatus(OperationStatus status) { | |||||||||||||||||||||||
| if (status.getStatusCode() == StatusCode.CANCELLED) { | |||||||||||||||||||||||
| future.internalCancel(); | |||||||||||||||||||||||
| return; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| result.set(status.isSuccess()); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void complete() { | |||||||||||||||||||||||
| future.complete(); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| }; | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| Operation op = client.getOpFact().flush(delay, cb); | |||||||||||||||||||||||
| future.setOp(op); | |||||||||||||||||||||||
| client.addOp(node, op); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| return future; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> bopCreate(String key, ElementValueType type, | |||||||||||||||||||||||
| CollectionAttributes attributes) { | |||||||||||||||||||||||
| if (attributes == null) { | |||||||||||||||||||||||
|
|
@@ -1780,4 +1719,214 @@ public ArcusFuture<Boolean> sopDelete(String key, T value, boolean dropIfEmpty) | ||||||||||||||||||||||
| SetDelete<T> delete = new SetDelete<>(value, dropIfEmpty, false, tcForCollection); | |||||||||||||||||||||||
| return collectionDelete(key, delete); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> flush() { | |||||||||||||||||||||||
| return flush(-1); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> flush(int delay) { | |||||||||||||||||||||||
| if (delay < -1) { | |||||||||||||||||||||||
| throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay"); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| ArcusClient client = arcusClientSupplier.get(); | |||||||||||||||||||||||
| Collection<MemcachedNode> nodes = client.getFlushNodes(); | |||||||||||||||||||||||
| Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size()); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| for (MemcachedNode node : nodes) { | |||||||||||||||||||||||
| AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>()); | |||||||||||||||||||||||
| ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| OperationCallback cb = new OperationCallback() { | |||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void receivedStatus(OperationStatus status) { | |||||||||||||||||||||||
| switch (status.getStatusCode()) { | |||||||||||||||||||||||
| case SUCCESS: | |||||||||||||||||||||||
| result.set(true); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| case CANCELLED: | |||||||||||||||||||||||
| future.internalCancel(); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| default: | |||||||||||||||||||||||
| result.addError(node.getSocketAddress().toString(), status); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void complete() { | |||||||||||||||||||||||
| future.complete(); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| }; | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| Operation op = client.getOpFact().flush(delay, cb); | |||||||||||||||||||||||
| future.setOp(op); | |||||||||||||||||||||||
| client.addOp(node, op); | |||||||||||||||||||||||
| futures.add(future); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| return new ArcusMultiFuture<>(futures, () -> { | |||||||||||||||||||||||
| for (CompletableFuture<?> future : futures) { | |||||||||||||||||||||||
| if (future.isCompletedExceptionally()) { | |||||||||||||||||||||||
| return false; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| return true; | |||||||||||||||||||||||
| }); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> flush(String prefix) { | |||||||||||||||||||||||
| return flush(prefix, -1); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Boolean> flush(String prefix, int delay) { | |||||||||||||||||||||||
| if (delay < -1) { | |||||||||||||||||||||||
| throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay"); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| ArcusClient client = arcusClientSupplier.get(); | |||||||||||||||||||||||
| Collection<MemcachedNode> nodes = client.getFlushNodes(); | |||||||||||||||||||||||
| Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size()); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| for (MemcachedNode node : nodes) { | |||||||||||||||||||||||
| AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>()); | |||||||||||||||||||||||
| ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| OperationCallback cb = new OperationCallback() { | |||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void receivedStatus(OperationStatus status) { | |||||||||||||||||||||||
| switch (status.getStatusCode()) { | |||||||||||||||||||||||
| case SUCCESS: | |||||||||||||||||||||||
| result.set(true); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| case ERR_NOT_FOUND: | |||||||||||||||||||||||
| result.set(false); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| case CANCELLED: | |||||||||||||||||||||||
| future.internalCancel(); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| default: | |||||||||||||||||||||||
| result.addError(node.getSocketAddress().toString(), status); | |||||||||||||||||||||||
| break; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void complete() { | |||||||||||||||||||||||
| future.complete(); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| }; | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| Operation op = client.getOpFact().flush(prefix, delay, false, cb); | |||||||||||||||||||||||
| future.setOp(op); | |||||||||||||||||||||||
| client.addOp(node, op); | |||||||||||||||||||||||
| futures.add(future); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| return new ArcusMultiFuture<>(futures, () -> { | |||||||||||||||||||||||
| for (CompletableFuture<?> future : futures) { | |||||||||||||||||||||||
| if (future.isCompletedExceptionally()) { | |||||||||||||||||||||||
| return false; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| if (Boolean.TRUE.equals(future.join())) { | |||||||||||||||||||||||
| return true; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| return false; | |||||||||||||||||||||||
|
Comment on lines
+1826
to
+1836
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
서버 동작
AS-IS (v1 문제)
시나리오: 노드 A, B, C 중 A에만
TO-BE (v2 변경)
|
|||||||||||||||||||||||
| }); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats() { | |||||||||||||||||||||||
| return stats(null); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats(String arg) { | |||||||||||||||||||||||
| ArcusClient client = arcusClientSupplier.get(); | |||||||||||||||||||||||
| Collection<MemcachedNode> nodes = client.getAllNodes(); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size()); | |||||||||||||||||||||||
| Map<SocketAddress, CompletableFuture<Map<String, String>>> addressToFuture | |||||||||||||||||||||||
| = new HashMap<>(nodes.size()); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| for (MemcachedNode node : nodes) { | |||||||||||||||||||||||
| SocketAddress address = node.getSocketAddress(); | |||||||||||||||||||||||
| AbstractArcusResult<Map<String, String>> result | |||||||||||||||||||||||
| = new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>())); | |||||||||||||||||||||||
| ArcusFutureImpl<Map<String, String>> future = new ArcusFutureImpl<>(result); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| StatsOperation.Callback cb = new StatsOperation.Callback() { | |||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void gotStat(String name, String val) { | |||||||||||||||||||||||
| result.get().put(name, val); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void receivedStatus(OperationStatus status) { | |||||||||||||||||||||||
| if (status.getStatusCode() == StatusCode.CANCELLED) { | |||||||||||||||||||||||
| future.internalCancel(); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void complete() { | |||||||||||||||||||||||
| future.complete(); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| }; | |||||||||||||||||||||||
| Operation op = client.getOpFact().stats(arg, cb); | |||||||||||||||||||||||
| future.setOp(op); | |||||||||||||||||||||||
| client.addOp(node, op); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| futures.add(future); | |||||||||||||||||||||||
| addressToFuture.put(address, future); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| return new ArcusMultiFuture<>(futures, () -> { | |||||||||||||||||||||||
| Map<SocketAddress, Map<String, String>> resultMap = new HashMap<>(addressToFuture.size()); | |||||||||||||||||||||||
| addressToFuture.forEach((address, future) -> resultMap.put(address, future.join())); | |||||||||||||||||||||||
| return resultMap; | |||||||||||||||||||||||
| }); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| public ArcusFuture<Map<SocketAddress, String>> versions() { | |||||||||||||||||||||||
| ArcusClient client = arcusClientSupplier.get(); | |||||||||||||||||||||||
| Collection<MemcachedNode> nodes = client.getAllNodes(); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size()); | |||||||||||||||||||||||
| Map<SocketAddress, CompletableFuture<String>> addressToFuture = new HashMap<>(nodes.size()); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| for (MemcachedNode node : nodes) { | |||||||||||||||||||||||
| SocketAddress address = node.getSocketAddress(); | |||||||||||||||||||||||
| AbstractArcusResult<String> result = new AbstractArcusResult<>(new AtomicReference<>()); | |||||||||||||||||||||||
| ArcusFutureImpl<String> future = new ArcusFutureImpl<>(result); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| OperationCallback cb = new OperationCallback() { | |||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void receivedStatus(OperationStatus status) { | |||||||||||||||||||||||
| if (status.getStatusCode() == StatusCode.CANCELLED) { | |||||||||||||||||||||||
| future.internalCancel(); | |||||||||||||||||||||||
| return; | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| result.set(status.getMessage()); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| @Override | |||||||||||||||||||||||
| public void complete() { | |||||||||||||||||||||||
| future.complete(); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| }; | |||||||||||||||||||||||
| Operation op = client.getOpFact().version(cb); | |||||||||||||||||||||||
| future.setOp(op); | |||||||||||||||||||||||
| client.addOp(node, op); | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| futures.add(future); | |||||||||||||||||||||||
| addressToFuture.put(address, future); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
|
|
|||||||||||||||||||||||
| return new ArcusMultiFuture<>(futures, () -> { | |||||||||||||||||||||||
| Map<SocketAddress, String> resultMap = new HashMap<>(addressToFuture.size()); | |||||||||||||||||||||||
| addressToFuture.forEach((address, future) -> resultMap.put(address, future.join())); | |||||||||||||||||||||||
| return resultMap; | |||||||||||||||||||||||
| }); | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
| } | |||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush(int delay)의 delay 파라미터 처리 방식에 대해 논의가 필요합니다.현재 -1을 "delay 파라미터를 생략한다"는 값으로 사용하고 있습니다.
flush_prefix <prefix>\r\n(delay 생략, 즉시 무효화)flush_prefix <prefix> 0\r\n(동일하게 즉시 무효화)서버는 음수를 받지 못하지만, "사용하지 않음"을 표현하기 위해 -1을 코드 내부 규약으로 허용하고 있습니다.
delay가 없는 경우(-1)와 delay = 0인 경우 모두 서버에서 즉시 flush로 동작이 일치하는데, 이를 어떻게 처리해야 할까요?