Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,7 @@ int getAddedQueueSize() {
*
* @return all memcached nodes from node locator
*/
protected Collection<MemcachedNode> getAllNodes() {
public Collection<MemcachedNode> getAllNodes() {
return conn.getLocator().getAll();
}

Expand Down
275 changes: 212 additions & 63 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package net.spy.memcached.v2;

import java.net.SocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -84,6 +85,7 @@
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StatusCode;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.transcoders.Transcoder;
Expand Down Expand Up @@ -612,34 +614,6 @@ public void complete() {
return future;
}

public ArcusFuture<Boolean> flush(int delay) {
ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getFlushNodes();

Collection<CompletableFuture<?>> futures = new ArrayList<>();

for (MemcachedNode node : nodes) {
CompletableFuture<Boolean> future = flush(client, node, delay).toCompletableFuture();
futures.add(future);
}

/*
* Combine all futures. Returns true if all flush operations succeed.
* Returns false if any flush operation fails.
*/
return new ArcusMultiFuture<>(futures, () -> {
for (CompletableFuture<?> future : futures) {
if (future.isCompletedExceptionally()) {
return false;
}
Boolean result = (Boolean) future.join();
if (result == null || !result) {
return false;
}
}
return true;
});
}

public ArcusFuture<Boolean> delete(String key) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
Expand Down Expand Up @@ -699,41 +673,6 @@ public ArcusFuture<Map<String, Boolean>> multiDelete(List<String> keys) {
});
}

/**
* Use only in flush method.
*
* @param client the ArcusClient instance to use
* @param node the MemcachedNode to flush
* @param delay flush delay
* @return ArcusFuture with flush result
*/
private ArcusFuture<Boolean> flush(ArcusClient client, MemcachedNode node, int delay) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
if (status.getStatusCode() == StatusCode.CANCELLED) {
future.internalCancel();
return;
}
result.set(status.isSuccess());
}

@Override
public void complete() {
future.complete();
}
};

Operation op = client.getOpFact().flush(delay, cb);
future.setOp(op);
client.addOp(node, op);

return future;
}

public ArcusFuture<Boolean> bopCreate(String key, ElementValueType type,
CollectionAttributes attributes) {
if (attributes == null) {
Expand Down Expand Up @@ -1780,4 +1719,214 @@ public ArcusFuture<Boolean> sopDelete(String key, T value, boolean dropIfEmpty)
SetDelete<T> delete = new SetDelete<>(value, dropIfEmpty, false, tcForCollection);
return collectionDelete(key, delete);
}

public ArcusFuture<Boolean> flush() {
return flush(-1);
}

public ArcusFuture<Boolean> flush(int delay) {
if (delay < -1) {
throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay");
Comment on lines +1727 to +1729
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush(int delay)delay 파라미터 처리 방식에 대해 논의가 필요합니다.

현재 -1을 "delay 파라미터를 생략한다"는 값으로 사용하고 있습니다.

  • delay = -1 → flush_prefix <prefix>\r\n (delay 생략, 즉시 무효화)
  • delay = 0 → flush_prefix <prefix> 0\r\n (동일하게 즉시 무효화)

서버는 음수를 받지 못하지만, "사용하지 않음"을 표현하기 위해 -1을 코드 내부 규약으로 허용하고 있습니다.

delay가 없는 경우(-1)와 delay = 0인 경우 모두 서버에서 즉시 flush로 동작이 일치하는데, 이를 어떻게 처리해야 할까요?

}

ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getFlushNodes();
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());

for (MemcachedNode node : nodes) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
result.set(true);
break;
case CANCELLED:
future.internalCancel();
break;
default:
result.addError(node.getSocketAddress().toString(), status);
break;
}
}

@Override
public void complete() {
future.complete();
}
};

Operation op = client.getOpFact().flush(delay, cb);
future.setOp(op);
client.addOp(node, op);
futures.add(future);
}

return new ArcusMultiFuture<>(futures, () -> {
for (CompletableFuture<?> future : futures) {
if (future.isCompletedExceptionally()) {
return false;
}
}
return true;
});
}

public ArcusFuture<Boolean> flush(String prefix) {
return flush(prefix, -1);
}

public ArcusFuture<Boolean> flush(String prefix, int delay) {
if (delay < -1) {
throw new IllegalArgumentException("Delay should be a positive integer or -1 for no delay");
}

ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getFlushNodes();
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());

for (MemcachedNode node : nodes) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
result.set(true);
break;
case ERR_NOT_FOUND:
result.set(false);
break;
case CANCELLED:
future.internalCancel();
break;
default:
result.addError(node.getSocketAddress().toString(), status);
break;
}
}

@Override
public void complete() {
future.complete();
}
};

Operation op = client.getOpFact().flush(prefix, delay, false, cb);
future.setOp(op);
client.addOp(node, op);
futures.add(future);
}

return new ArcusMultiFuture<>(futures, () -> {
for (CompletableFuture<?> future : futures) {
if (future.isCompletedExceptionally()) {
return false;
}

if (Boolean.TRUE.equals(future.join())) {
return true;
}
}
return false;
Comment on lines +1826 to +1836
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush_prefix 에 대하여 아래와 같은 상황으로 논의가 필요합니다.

TO-BE는 제 생각을 기반으로 현재 구현한 방향입니다.

서버 동작

flush_prefix <prefix> 명령은 모든 캐시 노드에 브로드캐스트됩니다. 이 때, 각 노드는 독립적으로 응답하게 됩니다.

  • 해당 prefix의 키가 있는 노드 → OK
  • 해당 prefix의 키가 없는 노드 → NOT_FOUND

AS-IS (v1 문제)

NOT_FOUND를 실패로 처리하여 하나라도 NOT_FOUND가 오면 false를 반환합니다.

시나리오: 노드 A, B, C 중 A에만 prefixA:key 존재

노드 서버 응답 v1 처리
A OK -
B NOT_FOUND result = false 덮어쓰기
C NOT_FOUND result = false 덮어쓰기
  • 실제 동작: A의 키는 flush됨
  • 반환값: false → prefix가 없어서 아무것도 안 됐나? 라고 사용자가 해석할 수 있음

TO-BE (v2 변경)

NOT_FOUND를 "해당 노드에 prefix 없음"으로 처리하고, 하나라도 flush가 실제로 발생했는지를 기준으로 반환값을 결정합니다.

상황 v1 v2
일부 노드에 prefix 존재 (A=OK, B/C=NOT_FOUND) false true
모든 노드에 prefix 없음 (A/B/C=NOT_FOUND) false false

});
}

public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats() {
return stats(null);
}

public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats(String arg) {
ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getAllNodes();

Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());
Map<SocketAddress, CompletableFuture<Map<String, String>>> addressToFuture
= new HashMap<>(nodes.size());

for (MemcachedNode node : nodes) {
SocketAddress address = node.getSocketAddress();
AbstractArcusResult<Map<String, String>> result
= new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>()));
ArcusFutureImpl<Map<String, String>> future = new ArcusFutureImpl<>(result);

StatsOperation.Callback cb = new StatsOperation.Callback() {
@Override
public void gotStat(String name, String val) {
result.get().put(name, val);
}

@Override
public void receivedStatus(OperationStatus status) {
if (status.getStatusCode() == StatusCode.CANCELLED) {
future.internalCancel();
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().stats(arg, cb);
future.setOp(op);
client.addOp(node, op);

futures.add(future);
addressToFuture.put(address, future);
}

return new ArcusMultiFuture<>(futures, () -> {
Map<SocketAddress, Map<String, String>> resultMap = new HashMap<>(addressToFuture.size());
addressToFuture.forEach((address, future) -> resultMap.put(address, future.join()));
return resultMap;
});
}

public ArcusFuture<Map<SocketAddress, String>> versions() {
ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getAllNodes();

Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());
Map<SocketAddress, CompletableFuture<String>> addressToFuture = new HashMap<>(nodes.size());

for (MemcachedNode node : nodes) {
SocketAddress address = node.getSocketAddress();
AbstractArcusResult<String> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<String> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
if (status.getStatusCode() == StatusCode.CANCELLED) {
future.internalCancel();
return;
}
result.set(status.getMessage());
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().version(cb);
future.setOp(op);
client.addOp(node, op);

futures.add(future);
addressToFuture.put(address, future);
}

return new ArcusMultiFuture<>(futures, () -> {
Map<SocketAddress, String> resultMap = new HashMap<>(addressToFuture.size());
addressToFuture.forEach((address, future) -> resultMap.put(address, future.join()));
return resultMap;
});
}
}
Loading
Loading