Skip to content

Commit c00aae8

Browse files
committed
FEATURE: Add CompletableFuture BTree position APIs
1 parent 18d73bd commit c00aae8

7 files changed

Lines changed: 739 additions & 17 deletions

File tree

src/main/java/net/spy/memcached/collection/BTreeFindPosition.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,24 @@ public class BTreeFindPosition {
3636
private final BTreeOrder order;
3737
private String str;
3838

39-
public BTreeFindPosition(long longBKey, BTreeOrder order) {
40-
this.bkey = String.valueOf(longBKey);
39+
public BTreeFindPosition(String bkey, BTreeOrder order) {
40+
if (order == null) {
41+
throw new IllegalArgumentException("BTreeOrder must not be null.");
42+
}
43+
if (bkey == null || bkey.isEmpty()) {
44+
throw new IllegalArgumentException("BKey must not be null or empty.");
45+
}
46+
47+
this.bkey = bkey;
4148
this.order = order;
4249
}
4350

51+
public BTreeFindPosition(long longBKey, BTreeOrder order) {
52+
this(String.valueOf(longBKey), order);
53+
}
54+
4455
public BTreeFindPosition(byte[] byteArrayBKey, BTreeOrder order) {
45-
this.bkey = BTreeUtil.toHex(byteArrayBKey);
46-
this.order = order;
56+
this(BTreeUtil.toHex(byteArrayBKey), order);
4757
}
4858

4959
public String stringify() {

src/main/java/net/spy/memcached/collection/BTreeFindPositionWithGet.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,32 +49,26 @@ public class BTreeFindPositionWithGet extends CollectionGet {
4949
private final int count;
5050
private BKeyObject bkey;
5151

52-
public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
52+
public BTreeFindPositionWithGet(BKeyObject bkeyObject, BTreeOrder order, int count) {
5353
if (order == null) {
5454
throw new IllegalArgumentException("BTreeOrder must not be null.");
5555
}
5656
if (count < 0 || count > 100) {
5757
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
5858
}
59-
this.bkeyObject = new BKeyObject(longBKey);
59+
this.bkeyObject = bkeyObject;
6060
this.order = order;
6161
this.count = count;
6262
this.eHeadCount = 2;
6363
this.eFlagIndex = 1;
6464
}
6565

66+
public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
67+
this(new BKeyObject(longBKey), order, count);
68+
}
69+
6670
public BTreeFindPositionWithGet(byte[] byteArrayBKey, BTreeOrder order, int count) {
67-
if (order == null) {
68-
throw new IllegalArgumentException("BTreeOrder must not be null.");
69-
}
70-
if (count < 0 || count > 100) {
71-
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
72-
}
73-
this.bkeyObject = new BKeyObject(byteArrayBKey);
74-
this.order = order;
75-
this.count = count;
76-
this.eHeadCount = 2;
77-
this.eFlagIndex = 1;
71+
this(new BKeyObject(byteArrayBKey), order, count);
7872
}
7973

8074
public String stringify() {

src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,18 @@
3636
import net.spy.memcached.collection.BKeyObject;
3737
import net.spy.memcached.collection.BTreeCount;
3838
import net.spy.memcached.collection.BTreeCreate;
39+
import net.spy.memcached.collection.BTreeFindPosition;
40+
import net.spy.memcached.collection.BTreeFindPositionWithGet;
3941
import net.spy.memcached.collection.BTreeDelete;
4042
import net.spy.memcached.collection.BTreeGet;
4143
import net.spy.memcached.collection.BTreeGetBulk;
4244
import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey;
4345
import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey;
46+
import net.spy.memcached.collection.BTreeGetByPosition;
4447
import net.spy.memcached.collection.BTreeInsert;
4548
import net.spy.memcached.collection.BTreeInsertAndGet;
4649
import net.spy.memcached.collection.BTreeMutate;
50+
import net.spy.memcached.collection.BTreeOrder;
4751
import net.spy.memcached.collection.BTreeSMGet;
4852
import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey;
4953
import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey;
@@ -60,7 +64,10 @@
6064
import net.spy.memcached.collection.ElementValueType;
6165
import net.spy.memcached.internal.result.GetsResultImpl;
6266
import net.spy.memcached.ops.APIType;
67+
import net.spy.memcached.ops.BTreeFindPositionOperation;
68+
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
6369
import net.spy.memcached.ops.BTreeGetBulkOperation;
70+
import net.spy.memcached.ops.BTreeGetByPositionOperation;
6471
import net.spy.memcached.ops.BTreeInsertAndGetOperation;
6572
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
6673
import net.spy.memcached.ops.CollectionCreateOperation;
@@ -79,6 +86,7 @@
7986
import net.spy.memcached.transcoders.TranscoderUtils;
8087
import net.spy.memcached.v2.vo.BKey;
8188
import net.spy.memcached.v2.vo.BTreeElement;
89+
import net.spy.memcached.v2.vo.BTreePositionElement;
8290
import net.spy.memcached.v2.vo.BTreeElements;
8391
import net.spy.memcached.v2.vo.BTreeUpdateElement;
8492
import net.spy.memcached.v2.vo.BopDeleteArgs;
@@ -1252,6 +1260,198 @@ private BTreeGetBulk<T> createBTreeGetBulk(MemcachedNode node, List<String> keys
12521260
}
12531261
}
12541262

1263+
public ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order) {
1264+
AbstractArcusResult<Integer> result = new AbstractArcusResult<>(new AtomicReference<>());
1265+
ArcusFutureImpl<Integer> future = new ArcusFutureImpl<>(result);
1266+
BTreeFindPosition findPosition = new BTreeFindPosition(bKey.toString(), order);
1267+
ArcusClient client = arcusClientSupplier.get();
1268+
1269+
BTreeFindPositionOperation.Callback cb = new BTreeFindPositionOperation.Callback() {
1270+
@Override
1271+
public void gotData(int position) {
1272+
result.set(position);
1273+
}
1274+
1275+
@Override
1276+
public void receivedStatus(OperationStatus status) {
1277+
switch (status.getStatusCode()) {
1278+
case SUCCESS:
1279+
break;
1280+
case ERR_NOT_FOUND:
1281+
case ERR_NOT_FOUND_ELEMENT:
1282+
result.set(null);
1283+
break;
1284+
case CANCELLED:
1285+
future.internalCancel();
1286+
break;
1287+
default:
1288+
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1289+
result.addError(key, status);
1290+
}
1291+
}
1292+
1293+
@Override
1294+
public void complete() {
1295+
future.complete();
1296+
}
1297+
};
1298+
Operation op = client.getOpFact().bopFindPosition(key, findPosition, cb);
1299+
future.setOp(op);
1300+
client.addOp(key, op);
1301+
1302+
return future;
1303+
}
1304+
1305+
public ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order) {
1306+
AbstractArcusResult<BTreeElement<T>> result
1307+
= new AbstractArcusResult<>(new AtomicReference<>());
1308+
ArcusFutureImpl<BTreeElement<T>> future = new ArcusFutureImpl<>(result);
1309+
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, pos);
1310+
ArcusClient client = arcusClientSupplier.get();
1311+
1312+
BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
1313+
@Override
1314+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1315+
result.set(buildBTreeElement(flags, bKey, eFlag, data));
1316+
}
1317+
1318+
@Override
1319+
public void receivedStatus(OperationStatus status) {
1320+
switch (status.getStatusCode()) {
1321+
case SUCCESS:
1322+
break;
1323+
case ERR_NOT_FOUND:
1324+
case ERR_NOT_FOUND_ELEMENT:
1325+
result.set(null);
1326+
break;
1327+
case CANCELLED:
1328+
future.internalCancel();
1329+
break;
1330+
default:
1331+
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1332+
result.addError(key, status);
1333+
}
1334+
}
1335+
1336+
@Override
1337+
public void complete() {
1338+
future.complete();
1339+
}
1340+
};
1341+
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
1342+
future.setOp(op);
1343+
client.addOp(key, op);
1344+
1345+
return future;
1346+
}
1347+
1348+
public ArcusFuture<List<BTreeElement<T>>> bopGetByPosition(String key,
1349+
int from, int to,
1350+
BTreeOrder order) {
1351+
if (from > to) {
1352+
throw new IllegalArgumentException("from should be less than or equal to to.");
1353+
}
1354+
1355+
AbstractArcusResult<List<BTreeElement<T>>> result
1356+
= new AbstractArcusResult<>(new AtomicReference<>(new ArrayList<>()));
1357+
ArcusFutureImpl<List<BTreeElement<T>>> future = new ArcusFutureImpl<>(result);
1358+
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, from, to);
1359+
ArcusClient client = arcusClientSupplier.get();
1360+
1361+
BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
1362+
@Override
1363+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1364+
result.get().add(buildBTreeElement(flags, bKey, eFlag, data));
1365+
}
1366+
1367+
@Override
1368+
public void receivedStatus(OperationStatus status) {
1369+
switch (status.getStatusCode()) {
1370+
case SUCCESS:
1371+
case ERR_NOT_FOUND_ELEMENT:
1372+
break;
1373+
case ERR_NOT_FOUND:
1374+
result.set(null);
1375+
break;
1376+
case CANCELLED:
1377+
future.internalCancel();
1378+
break;
1379+
default:
1380+
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1381+
result.addError(key, status);
1382+
}
1383+
}
1384+
1385+
@Override
1386+
public void complete() {
1387+
future.complete();
1388+
}
1389+
};
1390+
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
1391+
future.setOp(op);
1392+
client.addOp(key, op);
1393+
1394+
return future;
1395+
}
1396+
1397+
public ArcusFuture<List<BTreePositionElement<T>>> bopPositionWithGet(String key,
1398+
BKey bKey,
1399+
int count,
1400+
BTreeOrder order) {
1401+
AbstractArcusResult<List<BTreePositionElement<T>>> result =
1402+
new AbstractArcusResult<>(new AtomicReference<>(new ArrayList<>()));
1403+
ArcusFutureImpl<List<BTreePositionElement<T>>> future = new ArcusFutureImpl<>(result);
1404+
BTreeFindPositionWithGet findPositionWithGet =
1405+
new BTreeFindPositionWithGet(bKey.toBKeyObject(), order, count);
1406+
ArcusClient client = arcusClientSupplier.get();
1407+
1408+
BTreeFindPositionWithGetOperation.Callback cb = new BTreeFindPositionWithGetOperation
1409+
.Callback() {
1410+
1411+
@Override
1412+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1413+
T decodedData = tcForCollection.decode(
1414+
new CachedData(flags, data, tcForCollection.getMaxSize()));
1415+
result.get().add(new BTreePositionElement<>(BKey.of(bKey), decodedData, eFlag, pos));
1416+
}
1417+
1418+
@Override
1419+
public void receivedStatus(OperationStatus status) {
1420+
switch (status.getStatusCode()) {
1421+
case SUCCESS:
1422+
case ERR_NOT_FOUND_ELEMENT:
1423+
break;
1424+
case ERR_NOT_FOUND:
1425+
result.set(null);
1426+
break;
1427+
case CANCELLED:
1428+
future.internalCancel();
1429+
break;
1430+
default:
1431+
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1432+
result.addError(key, status);
1433+
}
1434+
}
1435+
1436+
@Override
1437+
public void complete() {
1438+
future.complete();
1439+
}
1440+
};
1441+
Operation op = client.getOpFact().bopFindPositionWithGet(key, findPositionWithGet, cb);
1442+
future.setOp(op);
1443+
client.addOp(key, op);
1444+
1445+
return future;
1446+
}
1447+
1448+
private BTreeElement<T> buildBTreeElement(int flags, BKeyObject bKey,
1449+
byte[] eFlag, byte[] data) {
1450+
T decodedData = tcForCollection.decode(
1451+
new CachedData(flags, data, tcForCollection.getMaxSize()));
1452+
return new BTreeElement<>(BKey.of(bKey), decodedData, eFlag);
1453+
}
1454+
12551455
public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
12561456
boolean unique, BopGetArgs args) {
12571457
verifyBKeyRange(from, to);

src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import java.util.Map;
2222

2323
import net.spy.memcached.CASValue;
24+
import net.spy.memcached.collection.BTreeOrder;
2425
import net.spy.memcached.collection.CollectionAttributes;
2526
import net.spy.memcached.collection.ElementFlagFilter;
2627
import net.spy.memcached.collection.ElementValueType;
2728
import net.spy.memcached.v2.vo.BKey;
29+
import net.spy.memcached.v2.vo.BTreePositionElement;
2830
import net.spy.memcached.v2.vo.BTreeElement;
2931
import net.spy.memcached.v2.vo.BTreeElements;
3032
import net.spy.memcached.v2.vo.BTreeUpdateElement;
@@ -369,6 +371,55 @@ ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
369371
BKey from, BKey to,
370372
BopGetArgs args);
371373

374+
/**
375+
* Get the position of an element with the given bKey in a btree item.
376+
*
377+
* @param key key of the btree item
378+
* @param bKey BKey of the element to find
379+
* @param order the order of the btree to determine position
380+
* @return the 0-based position of the element,
381+
* or {@code null} if the key or element is not found.
382+
*/
383+
ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order);
384+
385+
/**
386+
* Get an element at the given position in a btree item.
387+
*
388+
* @param key key of the btree item
389+
* @param pos 0-based position of the element to get
390+
* @param order the order of the btree to determine position
391+
* @return the {@code BTreeElement} at the given position,
392+
* or {@code null} if the key or element is not found.
393+
*/
394+
ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order);
395+
396+
/**
397+
* Get elements in a position range from a btree item.
398+
*
399+
* @param key key of the btree item
400+
* @param from start position (inclusive)
401+
* @param to end position (inclusive)
402+
* @param order the order of the btree to determine position
403+
* @return list of {@code BTreeElement} in the given position range, in traversal order,
404+
* empty list if no elements exist in the range, {@code null} if the key is not found.
405+
*/
406+
ArcusFuture<List<BTreeElement<T>>> bopGetByPosition(String key,
407+
int from, int to, BTreeOrder order);
408+
409+
/**
410+
* Get an element by bKey and its neighboring elements with position information.
411+
*
412+
* @param key key of the btree item
413+
* @param bKey BKey of the element to find
414+
* @param count the number of neighboring elements to retrieve on each side
415+
* (0 &le; count &le; 100)
416+
* @param order the order of the btree to determine position
417+
* @return list of {@code BTreePositionElement} in traversal order,
418+
* empty list if the element is not found, {@code null} if the key is not found.
419+
*/
420+
ArcusFuture<List<BTreePositionElement<T>>> bopPositionWithGet(String key, BKey bKey,
421+
int count, BTreeOrder order);
422+
372423
/**
373424
* Get sort-merged elements from multiple btree items.
374425
*

src/main/java/net/spy/memcached/v2/vo/BKey.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public static BKey of(Object bKey) {
5252
}
5353
}
5454

55+
public BKeyObject toBKeyObject() {
56+
if (this.type == BKeyType.LONG) {
57+
return new BKeyObject((Long) this.data);
58+
}
59+
return new BKeyObject((byte[]) this.data);
60+
}
61+
5562
public static BKey of(BKeyObject bkeyObject) {
5663
if (bkeyObject == null) {
5764
throw new IllegalArgumentException("BKeyObject cannot be null");

0 commit comments

Comments
 (0)