Skip to content

Commit f345e0c

Browse files
committed
FEATURE: Add CompletableFuture BTree position APIs
1 parent 43c5c67 commit f345e0c

6 files changed

Lines changed: 666 additions & 18 deletions

File tree

src/main/java/net/spy/memcached/collection/BTreeFindPosition.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,23 @@ public class BTreeFindPosition {
3636
private final BTreeOrder order;
3737
private String str;
3838

39-
public BTreeFindPosition(long longBKey, BTreeOrder order) {
40-
this.bkey = String.valueOf(longBKey);
39+
public BTreeFindPosition(String bkey, BTreeOrder order) {
40+
if (order == null) {
41+
throw new IllegalArgumentException("BTreeOrder must not be null.");
42+
}
43+
if (bkey == null || bkey.isEmpty()) {
44+
throw new IllegalArgumentException("BKey must not be null or empty.");
45+
}
46+
this.bkey = bkey;
4147
this.order = order;
4248
}
4349

50+
public BTreeFindPosition(long longBKey, BTreeOrder order) {
51+
this(String.valueOf(longBKey), order);
52+
}
53+
4454
public BTreeFindPosition(byte[] byteArrayBKey, BTreeOrder order) {
45-
this.bkey = BTreeUtil.toHex(byteArrayBKey);
46-
this.order = order;
55+
this(BTreeUtil.toHex(byteArrayBKey), order);
4756
}
4857

4958
public String stringify() {

src/main/java/net/spy/memcached/collection/BTreeFindPositionWithGet.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,32 +49,26 @@ public class BTreeFindPositionWithGet extends CollectionGet {
4949
private final int count;
5050
private BKeyObject bkey;
5151

52-
public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
52+
public BTreeFindPositionWithGet(BKeyObject bkeyObject, BTreeOrder order, int count) {
5353
if (order == null) {
5454
throw new IllegalArgumentException("BTreeOrder must not be null.");
5555
}
5656
if (count < 0 || count > 100) {
5757
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
5858
}
59-
this.bkeyObject = new BKeyObject(longBKey);
59+
this.bkeyObject = bkeyObject;
6060
this.order = order;
6161
this.count = count;
6262
this.eHeadCount = 2;
6363
this.eFlagIndex = 1;
6464
}
6565

66+
public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
67+
this(new BKeyObject(longBKey), order, count);
68+
}
69+
6670
public BTreeFindPositionWithGet(byte[] byteArrayBKey, BTreeOrder order, int count) {
67-
if (order == null) {
68-
throw new IllegalArgumentException("BTreeOrder must not be null.");
69-
}
70-
if (count < 0 || count > 100) {
71-
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
72-
}
73-
this.bkeyObject = new BKeyObject(byteArrayBKey);
74-
this.order = order;
75-
this.count = count;
76-
this.eHeadCount = 2;
77-
this.eFlagIndex = 1;
71+
this(new BKeyObject(byteArrayBKey), order, count);
7872
}
7973

8074
public String stringify() {
@@ -89,7 +83,7 @@ public String stringify() {
8983

9084
if (count > 0) {
9185
b.append(" ");
92-
b.append(String.valueOf(count));
86+
b.append(count);
9387
}
9488

9589
str = b.toString();

src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@
3535
import net.spy.memcached.MemcachedNode;
3636
import net.spy.memcached.collection.BKeyObject;
3737
import net.spy.memcached.collection.BTreeCreate;
38+
import net.spy.memcached.collection.BTreeFindPosition;
39+
import net.spy.memcached.collection.BTreeFindPositionWithGet;
3840
import net.spy.memcached.collection.BTreeGet;
3941
import net.spy.memcached.collection.BTreeGetBulk;
4042
import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey;
4143
import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey;
44+
import net.spy.memcached.collection.BTreeGetByPosition;
4245
import net.spy.memcached.collection.BTreeInsert;
4346
import net.spy.memcached.collection.BTreeInsertAndGet;
4447
import net.spy.memcached.collection.BTreeMutate;
48+
import net.spy.memcached.collection.BTreeOrder;
4549
import net.spy.memcached.collection.BTreeSMGet;
4650
import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey;
4751
import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey;
@@ -55,7 +59,10 @@
5559
import net.spy.memcached.collection.ElementValueType;
5660
import net.spy.memcached.internal.result.GetsResultImpl;
5761
import net.spy.memcached.ops.APIType;
62+
import net.spy.memcached.ops.BTreeFindPositionOperation;
63+
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
5864
import net.spy.memcached.ops.BTreeGetBulkOperation;
65+
import net.spy.memcached.ops.BTreeGetByPositionOperation;
5966
import net.spy.memcached.ops.BTreeInsertAndGetOperation;
6067
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
6168
import net.spy.memcached.ops.CollectionCreateOperation;
@@ -1246,6 +1253,192 @@ private BTreeGetBulk<T> createBTreeGetBulk(MemcachedNode node, List<String> keys
12461253
}
12471254
}
12481255

1256+
public ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order) {
1257+
AbstractArcusResult<Integer> result = new AbstractArcusResult<>(new AtomicReference<>());
1258+
ArcusFutureImpl<Integer> future = new ArcusFutureImpl<>(result);
1259+
BTreeFindPosition findPosition = new BTreeFindPosition(bKey.toString(), order);
1260+
ArcusClient client = arcusClientSupplier.get();
1261+
1262+
BTreeFindPositionOperation.Callback cb = new BTreeFindPositionOperation.Callback() {
1263+
@Override
1264+
public void gotData(int position) {
1265+
result.set(position);
1266+
}
1267+
1268+
@Override
1269+
public void receivedStatus(OperationStatus status) {
1270+
switch (status.getStatusCode()) {
1271+
case SUCCESS:
1272+
break;
1273+
case ERR_NOT_FOUND:
1274+
case ERR_NOT_FOUND_ELEMENT:
1275+
result.set(null);
1276+
break;
1277+
case CANCELLED:
1278+
future.internalCancel();
1279+
break;
1280+
default:
1281+
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1282+
result.addError(key, status);
1283+
}
1284+
}
1285+
1286+
@Override
1287+
public void complete() {
1288+
future.complete();
1289+
}
1290+
};
1291+
Operation op = client.getOpFact().bopFindPosition(key, findPosition, cb);
1292+
future.setOp(op);
1293+
client.addOp(key, op);
1294+
1295+
return future;
1296+
}
1297+
1298+
public ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order) {
1299+
AbstractArcusResult<BTreeElement<T>> result
1300+
= new AbstractArcusResult<>(new AtomicReference<>());
1301+
ArcusFutureImpl<BTreeElement<T>> future = new ArcusFutureImpl<>(result);
1302+
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, pos);
1303+
ArcusClient client = arcusClientSupplier.get();
1304+
1305+
BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
1306+
@Override
1307+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1308+
result.set(buildBTreeElement(flags, bKey, eFlag, data));
1309+
}
1310+
1311+
@Override
1312+
public void receivedStatus(OperationStatus status) {
1313+
switch (status.getStatusCode()) {
1314+
case SUCCESS:
1315+
break;
1316+
case ERR_NOT_FOUND:
1317+
case ERR_NOT_FOUND_ELEMENT:
1318+
result.set(null);
1319+
break;
1320+
case CANCELLED:
1321+
future.internalCancel();
1322+
break;
1323+
default:
1324+
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1325+
result.addError(key, status);
1326+
}
1327+
}
1328+
1329+
@Override
1330+
public void complete() {
1331+
future.complete();
1332+
}
1333+
};
1334+
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
1335+
future.setOp(op);
1336+
client.addOp(key, op);
1337+
1338+
return future;
1339+
}
1340+
1341+
public ArcusFuture<Map<Integer, BTreeElement<T>>> bopGetByPosition(String key,
1342+
int from, int to,
1343+
BTreeOrder order) {
1344+
Map<Integer, BTreeElement<T>> map = new HashMap<>();
1345+
AbstractArcusResult<Map<Integer, BTreeElement<T>>> result
1346+
= new AbstractArcusResult<>(new AtomicReference<>(map));
1347+
ArcusFutureImpl<Map<Integer, BTreeElement<T>>> future = new ArcusFutureImpl<>(result);
1348+
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, from, to);
1349+
ArcusClient client = arcusClientSupplier.get();
1350+
1351+
BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
1352+
@Override
1353+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1354+
result.get().put(pos, buildBTreeElement(flags, bKey, eFlag, data));
1355+
}
1356+
1357+
@Override
1358+
public void receivedStatus(OperationStatus status) {
1359+
switch (status.getStatusCode()) {
1360+
case SUCCESS:
1361+
case ERR_NOT_FOUND_ELEMENT:
1362+
break;
1363+
case ERR_NOT_FOUND:
1364+
result.set(null);
1365+
break;
1366+
case CANCELLED:
1367+
future.internalCancel();
1368+
break;
1369+
default:
1370+
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1371+
result.addError(key, status);
1372+
}
1373+
}
1374+
1375+
@Override
1376+
public void complete() {
1377+
future.complete();
1378+
}
1379+
};
1380+
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
1381+
future.setOp(op);
1382+
client.addOp(key, op);
1383+
1384+
return future;
1385+
}
1386+
1387+
public ArcusFuture<Map<Integer, BTreeElement<T>>> bopPositionWithGet(String key,
1388+
BKey bKey, int count,
1389+
BTreeOrder order) {
1390+
Map<Integer, BTreeElement<T>> map = new HashMap<>();
1391+
AbstractArcusResult<Map<Integer, BTreeElement<T>>> result =
1392+
new AbstractArcusResult<>(new AtomicReference<>(map));
1393+
ArcusFutureImpl<Map<Integer, BTreeElement<T>>> future = new ArcusFutureImpl<>(result);
1394+
BTreeFindPositionWithGet findPositionWithGet
1395+
= new BTreeFindPositionWithGet(bKey.toBKeyObject(), order, count);
1396+
ArcusClient client = arcusClientSupplier.get();
1397+
1398+
BTreeFindPositionWithGetOperation.Callback cb = new BTreeFindPositionWithGetOperation
1399+
.Callback() {
1400+
1401+
@Override
1402+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1403+
result.get().put(pos, buildBTreeElement(flags, bKey, eFlag, data));
1404+
}
1405+
1406+
@Override
1407+
public void receivedStatus(OperationStatus status) {
1408+
switch (status.getStatusCode()) {
1409+
case SUCCESS:
1410+
case ERR_NOT_FOUND_ELEMENT:
1411+
break;
1412+
case ERR_NOT_FOUND:
1413+
result.set(null);
1414+
break;
1415+
case CANCELLED:
1416+
future.internalCancel();
1417+
break;
1418+
default:
1419+
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1420+
result.addError(key, status);
1421+
}
1422+
}
1423+
1424+
@Override
1425+
public void complete() {
1426+
future.complete();
1427+
}
1428+
};
1429+
Operation op = client.getOpFact().bopFindPositionWithGet(key, findPositionWithGet, cb);
1430+
future.setOp(op);
1431+
client.addOp(key, op);
1432+
1433+
return future;
1434+
}
1435+
1436+
private BTreeElement<T> buildBTreeElement(int flags, BKeyObject bKey,
1437+
byte[] eFlag, byte[] data) {
1438+
T decodedData = tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize()));
1439+
return new BTreeElement<>(BKey.of(bKey), decodedData, eFlag);
1440+
}
1441+
12491442
public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
12501443
boolean unique, BopGetArgs args) {
12511444
verifyBKeyRange(from, to);

src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222

2323
import net.spy.memcached.CASValue;
24+
import net.spy.memcached.collection.BTreeOrder;
2425
import net.spy.memcached.collection.CollectionAttributes;
2526
import net.spy.memcached.collection.ElementValueType;
2627
import net.spy.memcached.v2.vo.BKey;
@@ -367,6 +368,57 @@ ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
367368
BKey from, BKey to,
368369
BopGetArgs args);
369370

371+
/**
372+
* Get the position of an element with the given bKey in a btree item.
373+
*
374+
* @param key key of the btree item
375+
* @param bKey BKey of the element to find
376+
* @param order the order of the btree to determine position
377+
* @return the 0-based position of the element,
378+
* or {@code null} if the key or element is not found.
379+
*/
380+
ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order);
381+
382+
/**
383+
* Get an element at the given position in a btree item.
384+
*
385+
* @param key key of the btree item
386+
* @param pos 0-based position of the element to get
387+
* @param order the order of the btree to determine position
388+
* @return the {@code BTreeElement} at the given position,
389+
* or {@code null} if the key is not found or position is out of range.
390+
*/
391+
ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order);
392+
393+
/**
394+
* Get elements in a position range from a btree item.
395+
*
396+
* @param key key of the btree item
397+
* @param from start position (inclusive)
398+
* @param to end position (inclusive)
399+
* @param order the order of the btree to determine position
400+
* @return Map of position to {@code BTreeElement}.
401+
* If no elements exist in the range but the key exists, an empty map is returned.
402+
* If the key is not found, {@code null} is returned.
403+
*/
404+
ArcusFuture<Map<Integer, BTreeElement<T>>> bopGetByPosition(String key,
405+
int from, int to, BTreeOrder order);
406+
407+
/**
408+
* Get the position of an element with the given bKey and retrieve neighboring elements.
409+
*
410+
* @param key key of the btree item
411+
* @param bKey BKey of the element to find
412+
* @param count the number of elements
413+
* to retrieve on each side of the found position (0 &le; count &le; 100)
414+
* @param order the order of the btree to determine position
415+
* @return Map of position to {@code BTreeElement} including the target element and its neighbors.
416+
* If the element is not found but the key exists, an empty map is returned.
417+
* If the key is not found, {@code null} is returned.
418+
*/
419+
ArcusFuture<Map<Integer, BTreeElement<T>>> bopPositionWithGet(String key, BKey bKey,
420+
int count, BTreeOrder order);
421+
370422
/**
371423
* Get sort-merged elements from multiple btree items.
372424
*

src/main/java/net/spy/memcached/v2/vo/BKey.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public static BKey of(Object bKey) {
5252
}
5353
}
5454

55+
public BKeyObject toBKeyObject() {
56+
if (this.type == BKeyType.LONG) {
57+
return new BKeyObject((Long) this.data);
58+
}
59+
return new BKeyObject((byte[]) this.data);
60+
}
61+
5562
public static BKey of(BKeyObject bkeyObject) {
5663
if (bkeyObject == null) {
5764
throw new IllegalArgumentException("BKeyObject cannot be null");

0 commit comments

Comments
 (0)