Skip to content

Commit 21ae673

Browse files
committed
FEATURE: Add CompletableFuture BTree position APIs
1 parent cc15a01 commit 21ae673

8 files changed

Lines changed: 686 additions & 23 deletions

File tree

src/main/java/net/spy/memcached/collection/BTreeFindPosition.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,32 @@ public class BTreeFindPosition {
3737
private String str;
3838

3939
public BTreeFindPosition(long longBKey, BTreeOrder order) {
40+
if (order == null) {
41+
throw new IllegalArgumentException("BTreeOrder must not be null.");
42+
}
4043
this.bkey = String.valueOf(longBKey);
4144
this.order = order;
4245
}
4346

4447
public BTreeFindPosition(byte[] byteArrayBKey, BTreeOrder order) {
48+
if (order == null) {
49+
throw new IllegalArgumentException("BTreeOrder must not be null.");
50+
}
4551
this.bkey = BTreeUtil.toHex(byteArrayBKey);
4652
this.order = order;
4753
}
4854

55+
public BTreeFindPosition(String bKey, BTreeOrder order) {
56+
if (order == null) {
57+
throw new IllegalArgumentException("BTreeOrder must not be null.");
58+
}
59+
if (bKey == null || bKey.isEmpty()) {
60+
throw new IllegalArgumentException("BKey must not be null or empty.");
61+
}
62+
this.bkey = bKey;
63+
this.order = order;
64+
}
65+
4966
public String stringify() {
5067
if (str != null) {
5168
return str;

src/main/java/net/spy/memcached/collection/BTreeFindPositionWithGet.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,31 @@
4444
public class BTreeFindPositionWithGet extends CollectionGet {
4545
private static final String COMMAND = "bop pwg";
4646

47-
private final BKeyObject bkeyObject;
47+
private final BKeyObject bKeyObject;
4848
private final BTreeOrder order;
4949
private final int count;
50-
private BKeyObject bkey;
50+
private BKeyObject bKey;
5151

52-
public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
52+
public BTreeFindPositionWithGet(BKeyObject bKeyObject, BTreeOrder order, int count) {
5353
if (order == null) {
5454
throw new IllegalArgumentException("BTreeOrder must not be null.");
5555
}
5656
if (count < 0 || count > 100) {
5757
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
5858
}
59-
this.bkeyObject = new BKeyObject(longBKey);
59+
this.bKeyObject = bKeyObject;
6060
this.order = order;
6161
this.count = count;
6262
this.eHeadCount = 2;
6363
this.eFlagIndex = 1;
6464
}
6565

66+
public BTreeFindPositionWithGet(long longBKey, BTreeOrder order, int count) {
67+
this(new BKeyObject(longBKey), order, count);
68+
}
69+
6670
public BTreeFindPositionWithGet(byte[] byteArrayBKey, BTreeOrder order, int count) {
67-
if (order == null) {
68-
throw new IllegalArgumentException("BTreeOrder must not be null.");
69-
}
70-
if (count < 0 || count > 100) {
71-
throw new IllegalArgumentException("Count must be a value between 0 and 100.");
72-
}
73-
this.bkeyObject = new BKeyObject(byteArrayBKey);
74-
this.order = order;
75-
this.count = count;
76-
this.eHeadCount = 2;
77-
this.eFlagIndex = 1;
71+
this(new BKeyObject(byteArrayBKey), order, count);
7872
}
7973

8074
public String stringify() {
@@ -83,13 +77,13 @@ public String stringify() {
8377
}
8478

8579
StringBuilder b = new StringBuilder();
86-
b.append(bkeyObject);
80+
b.append(bKeyObject);
8781
b.append(" ");
8882
b.append(order.getAscii());
8983

9084
if (count > 0) {
9185
b.append(" ");
92-
b.append(String.valueOf(count));
86+
b.append(count);
9387
}
9488

9589
str = b.toString();
@@ -114,9 +108,9 @@ public byte[] getAdditionalArgs() {
114108
public void decodeElemHeader(List<String> tokens) {
115109
subkey = tokens.get(0);
116110
if (subkey.startsWith("0x")) {
117-
bkey = new BKeyObject(BTreeUtil.hexStringToByteArrays(subkey.substring(2)));
111+
bKey = new BKeyObject(BTreeUtil.hexStringToByteArrays(subkey.substring(2)));
118112
} else {
119-
bkey = new BKeyObject(Long.parseLong(subkey));
113+
bKey = new BKeyObject(Long.parseLong(subkey));
120114
}
121115
if (tokens.size() == 2) {
122116
dataLength = Integer.parseInt(tokens.get(1));
@@ -126,7 +120,7 @@ public void decodeElemHeader(List<String> tokens) {
126120
}
127121
}
128122

129-
public BKeyObject getBkey() {
130-
return bkey;
123+
public BKeyObject getBKey() {
124+
return bKey;
131125
}
132126
}

src/main/java/net/spy/memcached/ops/BTreeFindPositionWithGetOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ public interface BTreeFindPositionWithGetOperation extends KeyedOperation {
2424
BTreeFindPositionWithGet getGet();
2525

2626
interface Callback extends OperationCallback {
27-
void gotData(int pos, int flags, BKeyObject bkey, byte[] eflag, byte[] data);
27+
void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data);
2828
}
2929
}

src/main/java/net/spy/memcached/protocol/ascii/BTreeFindPositionWithGetOperationImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public void handleRead(ByteBuffer bb) {
195195
// put an element data.
196196
BTreeFindPositionWithGetOperation.Callback cb =
197197
(BTreeFindPositionWithGetOperation.Callback) getCallback();
198-
cb.gotData(pos, flags, get.getBkey(), get.getElementFlag(), data);
198+
cb.gotData(pos, flags, get.getBKey(), get.getElementFlag(), data);
199199

200200
// next position.
201201
pos += posDiff;

src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,25 @@
3535
import net.spy.memcached.MemcachedNode;
3636
import net.spy.memcached.collection.BKeyObject;
3737
import net.spy.memcached.collection.BTreeCreate;
38+
import net.spy.memcached.collection.BTreeFindPosition;
39+
import net.spy.memcached.collection.BTreeFindPositionWithGet;
3840
import net.spy.memcached.collection.BTreeGet;
3941
import net.spy.memcached.collection.BTreeGetBulk;
4042
import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey;
4143
import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey;
44+
import net.spy.memcached.collection.BTreeGetByPosition;
45+
import net.spy.memcached.collection.BTreeOrder;
4246
import net.spy.memcached.collection.BTreeUpdate;
4347
import net.spy.memcached.collection.BTreeUpsert;
4448
import net.spy.memcached.collection.CollectionUpdate;
4549
import net.spy.memcached.internal.result.GetsResultImpl;
50+
import net.spy.memcached.ops.BTreeFindPositionOperation;
51+
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
4652
import net.spy.memcached.ops.BTreeGetBulkOperation;
4753
import net.spy.memcached.collection.BTreeSMGet;
4854
import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey;
4955
import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey;
56+
import net.spy.memcached.ops.BTreeGetByPositionOperation;
5057
import net.spy.memcached.ops.BTreeSortMergeGetOperation;
5158
import net.spy.memcached.collection.BTreeInsert;
5259
import net.spy.memcached.collection.BTreeInsertAndGet;
@@ -1244,6 +1251,192 @@ private BTreeGetBulk<T> createBTreeGetBulk(MemcachedNode node, List<String> keys
12441251
}
12451252
}
12461253

1254+
public ArcusFuture<Integer> bopGetPosition(String key, BKey bKey, BTreeOrder order) {
1255+
AbstractArcusResult<Integer> result = new AbstractArcusResult<>(new AtomicReference<>());
1256+
ArcusFutureImpl<Integer> future = new ArcusFutureImpl<>(result);
1257+
BTreeFindPosition findPosition = new BTreeFindPosition(bKey.toString(), order);
1258+
ArcusClient client = arcusClientSupplier.get();
1259+
1260+
BTreeFindPositionOperation.Callback cb = new BTreeFindPositionOperation.Callback() {
1261+
@Override
1262+
public void gotData(int position) {
1263+
result.set(position);
1264+
}
1265+
1266+
@Override
1267+
public void receivedStatus(OperationStatus status) {
1268+
switch (status.getStatusCode()) {
1269+
case SUCCESS:
1270+
break;
1271+
case ERR_NOT_FOUND:
1272+
case ERR_NOT_FOUND_ELEMENT:
1273+
result.set(null);
1274+
break;
1275+
case CANCELLED:
1276+
future.internalCancel();
1277+
break;
1278+
default:
1279+
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1280+
result.addError(key, status);
1281+
}
1282+
}
1283+
1284+
@Override
1285+
public void complete() {
1286+
future.complete();
1287+
}
1288+
};
1289+
Operation op = client.getOpFact().bopFindPosition(key, findPosition, cb);
1290+
future.setOp(op);
1291+
client.addOp(key, op);
1292+
1293+
return future;
1294+
}
1295+
1296+
public ArcusFuture<BTreeElement<T>> bopGetByPosition(String key, int pos, BTreeOrder order) {
1297+
AbstractArcusResult<BTreeElement<T>> result
1298+
= new AbstractArcusResult<>(new AtomicReference<>());
1299+
ArcusFutureImpl<BTreeElement<T>> future = new ArcusFutureImpl<>(result);
1300+
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, pos);
1301+
ArcusClient client = arcusClientSupplier.get();
1302+
1303+
BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
1304+
@Override
1305+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1306+
result.set(decodeBTreeElement(flags, bKey, eFlag, data));
1307+
}
1308+
1309+
@Override
1310+
public void receivedStatus(OperationStatus status) {
1311+
switch (status.getStatusCode()) {
1312+
case SUCCESS:
1313+
break;
1314+
case ERR_NOT_FOUND:
1315+
case ERR_NOT_FOUND_ELEMENT:
1316+
result.set(null);
1317+
break;
1318+
case CANCELLED:
1319+
future.internalCancel();
1320+
break;
1321+
default:
1322+
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1323+
result.addError(key, status);
1324+
}
1325+
}
1326+
1327+
@Override
1328+
public void complete() {
1329+
future.complete();
1330+
}
1331+
};
1332+
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
1333+
future.setOp(op);
1334+
client.addOp(key, op);
1335+
1336+
return future;
1337+
}
1338+
1339+
public ArcusFuture<Map<Integer, BTreeElement<T>>> bopGetByPosition(String key,
1340+
int from, int to,
1341+
BTreeOrder order) {
1342+
Map<Integer, BTreeElement<T>> map = new HashMap<>();
1343+
AbstractArcusResult<Map<Integer, BTreeElement<T>>> result
1344+
= new AbstractArcusResult<>(new AtomicReference<>(map));
1345+
ArcusFutureImpl<Map<Integer, BTreeElement<T>>> future = new ArcusFutureImpl<>(result);
1346+
BTreeGetByPosition getByPosition = new BTreeGetByPosition(order, from, to);
1347+
ArcusClient client = arcusClientSupplier.get();
1348+
1349+
BTreeGetByPositionOperation.Callback cb = new BTreeGetByPositionOperation.Callback() {
1350+
@Override
1351+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1352+
result.get().put(pos, decodeBTreeElement(flags, bKey, eFlag, data));
1353+
}
1354+
1355+
@Override
1356+
public void receivedStatus(OperationStatus status) {
1357+
switch (status.getStatusCode()) {
1358+
case SUCCESS:
1359+
case ERR_NOT_FOUND_ELEMENT:
1360+
break;
1361+
case ERR_NOT_FOUND:
1362+
result.set(null);
1363+
break;
1364+
case CANCELLED:
1365+
future.internalCancel();
1366+
break;
1367+
default:
1368+
/* TYPE_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1369+
result.addError(key, status);
1370+
}
1371+
}
1372+
1373+
@Override
1374+
public void complete() {
1375+
future.complete();
1376+
}
1377+
};
1378+
Operation op = client.getOpFact().bopGetByPosition(key, getByPosition, cb);
1379+
future.setOp(op);
1380+
client.addOp(key, op);
1381+
1382+
return future;
1383+
}
1384+
1385+
public ArcusFuture<Map<Integer, BTreeElement<T>>> bopPositionWithGet(String key,
1386+
BKey bKey, int count,
1387+
BTreeOrder order) {
1388+
Map<Integer, BTreeElement<T>> map = new HashMap<>();
1389+
AbstractArcusResult<Map<Integer, BTreeElement<T>>> result =
1390+
new AbstractArcusResult<>(new AtomicReference<>(map));
1391+
ArcusFutureImpl<Map<Integer, BTreeElement<T>>> future = new ArcusFutureImpl<>(result);
1392+
BTreeFindPositionWithGet findPositionWithGet
1393+
= new BTreeFindPositionWithGet(bKey.toBKeyObject(), order, count);
1394+
ArcusClient client = arcusClientSupplier.get();
1395+
1396+
BTreeFindPositionWithGetOperation.Callback cb = new BTreeFindPositionWithGetOperation
1397+
.Callback() {
1398+
1399+
@Override
1400+
public void gotData(int pos, int flags, BKeyObject bKey, byte[] eFlag, byte[] data) {
1401+
result.get().put(pos, decodeBTreeElement(flags, bKey, eFlag, data));
1402+
}
1403+
1404+
@Override
1405+
public void receivedStatus(OperationStatus status) {
1406+
switch (status.getStatusCode()) {
1407+
case SUCCESS:
1408+
case ERR_NOT_FOUND_ELEMENT:
1409+
break;
1410+
case ERR_NOT_FOUND:
1411+
result.set(null);
1412+
break;
1413+
case CANCELLED:
1414+
future.internalCancel();
1415+
break;
1416+
default:
1417+
/* TYPE_MISMATCH / BKEY_MISMATCH / UNREADABLE / NOT_SUPPORTED or unknown statement */
1418+
result.addError(key, status);
1419+
}
1420+
}
1421+
1422+
@Override
1423+
public void complete() {
1424+
future.complete();
1425+
}
1426+
};
1427+
Operation op = client.getOpFact().bopFindPositionWithGet(key, findPositionWithGet, cb);
1428+
future.setOp(op);
1429+
client.addOp(key, op);
1430+
1431+
return future;
1432+
}
1433+
1434+
private BTreeElement<T> decodeBTreeElement(int flags, BKeyObject bKey,
1435+
byte[] eFlag, byte[] data) {
1436+
T decodedData = tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize()));
1437+
return new BTreeElement<>(BKey.of(bKey), decodedData, eFlag);
1438+
}
1439+
12471440
public ArcusFuture<SMGetElements<T>> bopSortMergeGet(List<String> keys, BKey from, BKey to,
12481441
boolean unique, BopGetArgs args) {
12491442
verifyBKeyRange(from, to);

0 commit comments

Comments
 (0)