Skip to content

Commit 1237d3d

Browse files
authored
Merge pull request #1013: [direct-io-cassandra] #357 refactor insert statements
2 parents a6ca3d0 + 09295ed commit 1237d3d

3 files changed

Lines changed: 28 additions & 39 deletions

File tree

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CacheableCqlFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import cz.o2.proxima.core.repository.EntityDescriptor;
2323
import cz.o2.proxima.core.storage.StreamElement;
2424
import cz.o2.proxima.core.storage.UriUtil;
25-
import cz.o2.proxima.io.serialization.shaded.com.google.common.annotations.VisibleForTesting;
2625
import cz.o2.proxima.io.serialization.shaded.com.google.common.base.Strings;
2726
import java.net.URI;
2827
import java.util.Collections;
@@ -288,7 +287,6 @@ protected void clearCache() {
288287
listAllAttributes = null;
289288
}
290289

291-
@VisibleForTesting
292290
String toUnderScore(String what) {
293291
StringBuilder sb = new StringBuilder();
294292
for (char c : what.toCharArray()) {

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactory.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ protected void setup(Map<String, String> query, StringConverter<?> converter) {
196196

197197
@Override
198198
public Optional<BoundStatement> getWriteStatement(StreamElement element, CqlSession session) {
199-
200199
ensureSession(session);
201200
if (element.isDelete()) {
202201
return elementDelete(element);
@@ -246,18 +245,18 @@ private Optional<BoundStatement> elementInsert(StreamElement ingest) {
246245
if (colVal != null) {
247246
BoundStatement bind =
248247
prepared.bind(
249-
ingest.getKey(),
250-
colVal,
248+
ingest.getStamp() * 1000L,
251249
ByteBuffer.wrap(serializeValue(ingest)),
252-
ingest.getStamp() * 1000L);
250+
ingest.getKey(),
251+
colVal);
253252
return Optional.of(bind);
254253
}
255254
return Optional.empty();
256255
}
257256

258257
BoundStatement bind =
259258
prepared.bind(
260-
ingest.getKey(), ByteBuffer.wrap(serializeValue(ingest)), ingest.getStamp() * 1000L);
259+
ingest.getStamp() * 1000L, ByteBuffer.wrap(serializeValue(ingest)), ingest.getKey());
261260
return Optional.of(bind);
262261
}
263262

@@ -273,24 +272,21 @@ private Optional<BoundStatement> elementDelete(StreamElement ingest) {
273272

274273
@Override
275274
protected String createInsertStatement(StreamElement element) {
276-
275+
String ttlStatement = ttl > 0 ? ("USING TTL " + ttl + " ") : "";
277276
if (element.getAttributeDescriptor().isWildcard()) {
278277
// use the first part of the attribute name
279278
String colName = toColName(element.getAttributeDescriptor());
280279
return String.format(
281-
"INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?) USING TIMESTAMP ?%s",
280+
"UPDATE %s USING TIMESTAMP ? %sSET %s=? WHERE %s=? AND %s=?",
282281
getTableName(),
283-
primaryField,
284-
toUnderScore(colName),
282+
ttlStatement,
285283
toPayloadCol(element.getAttributeDescriptor()),
286-
ttl > 0 ? (" AND TTL " + ttl) : "");
284+
primaryField,
285+
toUnderScore(colName));
287286
} else {
288287
return String.format(
289-
"INSERT INTO %s (%s, %s) VALUES (?, ?) USING TIMESTAMP ?%s",
290-
getTableName(),
291-
primaryField,
292-
toUnderScore(element.getAttribute()),
293-
ttl > 0 ? (" AND TTL " + ttl) : "");
288+
"UPDATE %s USING TIMESTAMP ? %sSET %s=? WHERE %s=?",
289+
getTableName(), ttlStatement, toUnderScore(element.getAttribute()), primaryField);
294290
}
295291
}
296292

direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactoryTest.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,16 @@ public void testIngest() {
172172
now,
173173
"value".getBytes());
174174
BoundStatement bound = mock(BoundStatement.class);
175-
when(statement.bind("key", ByteBuffer.wrap("value".getBytes()), now * 1000L)).thenReturn(bound);
175+
when(statement.bind(now * 1000L, ByteBuffer.wrap("value".getBytes()), "key")).thenReturn(bound);
176176
when(session.prepare((String) any())).thenReturn(statement);
177177

178178
Optional<BoundStatement> boundStatement = factory.getWriteStatement(ingest, session);
179-
verify(statement).bind(eq("key"), eq(ByteBuffer.wrap("value".getBytes())), eq(now * 1000L));
179+
verify(statement).bind(eq(now * 1000L), eq(ByteBuffer.wrap("value".getBytes())), eq("key"));
180180
assertTrue(boundStatement.isPresent());
181181
assertSame(bound, boundStatement.get());
182182
assertEquals(1, preparedStatement.size());
183183
assertEquals(
184-
"INSERT INTO my_table (hgw, my_attribute) VALUES (?, ?) USING TIMESTAMP ?",
184+
"UPDATE my_table USING TIMESTAMP ? SET my_attribute=? WHERE hgw=?",
185185
preparedStatement.get(0));
186186
}
187187

@@ -202,18 +202,17 @@ public void testIngestWithTtl() {
202202
now,
203203
"value".getBytes());
204204
BoundStatement bound = mock(BoundStatement.class);
205-
when(statement.bind("key", ByteBuffer.wrap("value".getBytes()), now * 1000L)).thenReturn(bound);
205+
when(statement.bind(now * 1000L, ByteBuffer.wrap("value".getBytes()), "key")).thenReturn(bound);
206206
when(session.prepare((String) any())).thenReturn(statement);
207207
when(bound.set(eq(1), any(), eq(ByteBuffer.class))).thenReturn(bound);
208208

209209
Optional<BoundStatement> boundStatement = factory.getWriteStatement(ingest, session);
210-
verify(statement).bind(eq("key"), eq(ByteBuffer.wrap("value".getBytes())), eq(now * 1000L));
210+
verify(statement).bind(eq(now * 1000L), eq(ByteBuffer.wrap("value".getBytes())), eq("key"));
211211
assertTrue(boundStatement.isPresent());
212212
assertSame(bound, boundStatement.get());
213213
assertEquals(1, preparedStatement.size());
214214
assertEquals(
215-
"INSERT INTO my_table (hgw, my_attribute) VALUES (?, ?) USING TIMESTAMP ?"
216-
+ " AND TTL 86400",
215+
"UPDATE my_table USING TIMESTAMP ? USING TTL 86400 SET my_attribute=? WHERE hgw=?",
217216
preparedStatement.get(0));
218217
}
219218

@@ -230,20 +229,18 @@ public void testIngestWildcard() {
230229
now,
231230
"value".getBytes());
232231
BoundStatement bound = mock(BoundStatement.class);
233-
when(statement.bind("key", "1", ByteBuffer.wrap("value".getBytes()), now * 1000L))
232+
when(statement.bind(now * 1000L, ByteBuffer.wrap("value".getBytes()), "key", "1"))
234233
.thenReturn(bound);
235234
when(session.prepare((String) any())).thenReturn(statement);
236235

237236
Optional<BoundStatement> boundStatement = factory.getWriteStatement(ingest, session);
238237
verify(statement)
239-
.bind(
240-
eq("key"), eq("1"),
241-
eq(ByteBuffer.wrap("value".getBytes())), eq(now * 1000L));
238+
.bind(eq(now * 1000L), eq(ByteBuffer.wrap("value".getBytes())), eq("key"), eq("1"));
242239
assertTrue(boundStatement.isPresent());
243240
assertSame(bound, boundStatement.get());
244241
assertEquals(1, preparedStatement.size());
245242
assertEquals(
246-
"INSERT INTO my_table (hgw, device, my_col) VALUES (?, ?, ?) USING TIMESTAMP ?",
243+
"UPDATE my_table USING TIMESTAMP ? SET my_col=? WHERE hgw=? AND device=?",
247244
preparedStatement.get(0));
248245
}
249246

@@ -260,20 +257,18 @@ public void testIngestWildcardMultiDots() {
260257
now,
261258
"value".getBytes());
262259
BoundStatement bound = mock(BoundStatement.class);
263-
when(statement.bind("key", "1.2", ByteBuffer.wrap("value".getBytes()), now * 1000L))
260+
when(statement.bind(now * 1000L, ByteBuffer.wrap("value".getBytes()), "key", "1.2"))
264261
.thenReturn(bound);
265262
when(session.prepare((String) any())).thenReturn(statement);
266263

267264
Optional<BoundStatement> boundStatement = factory.getWriteStatement(ingest, session);
268265
verify(statement)
269-
.bind(
270-
eq("key"), eq("1.2"),
271-
eq(ByteBuffer.wrap("value".getBytes())), eq(now * 1000L));
266+
.bind(eq(now * 1000L), eq(ByteBuffer.wrap("value".getBytes())), eq("key"), eq("1.2"));
272267
assertTrue(boundStatement.isPresent());
273268
assertSame(bound, boundStatement.get());
274269
assertEquals(1, preparedStatement.size());
275270
assertEquals(
276-
"INSERT INTO my_table (hgw, device, my_col) VALUES (?, ?, ?) USING TIMESTAMP ?",
271+
"UPDATE my_table USING TIMESTAMP ? SET my_col=? WHERE hgw=? AND device=?",
277272
preparedStatement.get(0));
278273
}
279274

@@ -481,7 +476,7 @@ public void testV2SerializerIngest() {
481476
StreamElement ingest =
482477
StreamElement.upsert(entity, attr, 1001L, "key", "myAttribute", now, "value".getBytes());
483478
BoundStatement bound = mock(BoundStatement.class);
484-
when(statement.bind(eq("key"), any(), eq(now * 1000L)))
479+
when(statement.bind(eq(now * 1000L), any(), eq("key")))
485480
.thenAnswer(
486481
invocationOnMock -> {
487482
ByteBuffer bytes = invocationOnMock.getArgument(1);
@@ -508,7 +503,7 @@ public void testV2SerializerIngest() {
508503
assertSame(bound, boundStatement.get());
509504
assertEquals(1, preparedStatement.size());
510505
assertEquals(
511-
"INSERT INTO my_table (hgw, my_attribute) VALUES (?, ?) USING TIMESTAMP ?",
506+
"UPDATE my_table USING TIMESTAMP ? SET my_attribute=? WHERE hgw=?",
512507
preparedStatement.get(0));
513508
}
514509

@@ -525,10 +520,10 @@ public void testV2SerializerIngestWildcard() {
525520
StreamElement.upsert(
526521
entity, attrWildcard, 1001L, "key", "device.1", now, "value".getBytes());
527522
BoundStatement bound = mock(BoundStatement.class);
528-
when(statement.bind(eq("key"), eq("1"), any(), eq(now * 1000L)))
523+
when(statement.bind(eq(now * 1000L), any(), eq("key"), eq("1")))
529524
.thenAnswer(
530525
invocationOnMock -> {
531-
ByteBuffer bytes = invocationOnMock.getArgument(2);
526+
ByteBuffer bytes = invocationOnMock.getArgument(1);
532527
Cell cell = Cell.parseFrom(ByteString.copyFrom(bytes));
533528
assertEquals(1001L, cell.getSeqId());
534529
assertEquals("value", cell.getValue().toStringUtf8());
@@ -552,7 +547,7 @@ public void testV2SerializerIngestWildcard() {
552547
assertSame(boundStatement.get(), bound);
553548
assertEquals(1, preparedStatement.size());
554549
assertEquals(
555-
"INSERT INTO my_table (hgw, device, my_col) VALUES (?, ?, ?) USING TIMESTAMP ?",
550+
"UPDATE my_table USING TIMESTAMP ? SET my_col=? WHERE hgw=? AND device=?",
556551
preparedStatement.get(0));
557552
}
558553

0 commit comments

Comments
 (0)