Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,55 +135,90 @@ public void testProcessEmptyEvents() {

@Test
public void testProcessBatchTooLargeWithTwoEvents() {
when(event.hasPersistentActorBytes()).thenReturn(true);
when(event.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event.persistentActorId()).thenReturn("actorId");
when(event.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));
// Create two separate events to verify they're both processed
PersistentActorUpdateEvent event1 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event2 = mock(PersistentActorUpdateEvent.class);

BoundStatement boundStatement1 = mock(BoundStatement.class);
BoundStatement boundStatement2 = mock(BoundStatement.class);
when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement1).thenReturn(boundStatement2);
when(event1.hasPersistentActorBytes()).thenReturn(true);
when(event1.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event1.persistentActorId()).thenReturn("actorId");
when(event1.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));

when(event2.hasPersistentActorBytes()).thenReturn(true);
when(event2.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event2.persistentActorId()).thenReturn("actorId");
when(event2.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));

BoundStatement boundStatement = mock(BoundStatement.class);
when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement);

doThrow(new InvalidQueryException(cassandraNode, "Batch too large"))
.when(cqlSession)
.execute(any(BatchStatement.class));

processor.process(List.of(event, event));
processor.process(List.of(event1, event2));

// Verify that after batch fails, events are processed individually
verify(cqlSession, times(1)).execute(any(BatchStatement.class));
verify(insertStatement, times(4)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024]));
verify(cqlSession, times(2)).execute(any(BoundStatement.class));

}

@Test
public void testProcessBatchTooLargeWithThreeEvents() {
when(event.hasPersistentActorBytes()).thenReturn(true);
when(event.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event.persistentActorId()).thenReturn("actorId");
when(event.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));
// Create three separate events with listeners to verify they're all processed
PersistentActorUpdateEvent event1 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event2 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event3 = mock(PersistentActorUpdateEvent.class);

when(event1.hasPersistentActorBytes()).thenReturn(true);
when(event1.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event1.persistentActorId()).thenReturn("actorId");
when(event1.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));

when(event2.hasPersistentActorBytes()).thenReturn(true);
when(event2.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event2.persistentActorId()).thenReturn("actorId");
when(event2.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));

when(event3.hasPersistentActorBytes()).thenReturn(true);
when(event3.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event3.persistentActorId()).thenReturn("actorId");
when(event3.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));

BoundStatement boundStatement1 = mock(BoundStatement.class);
BoundStatement boundStatement2 = mock(BoundStatement.class);
when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement1).thenReturn(boundStatement2);
BoundStatement boundStatement = mock(BoundStatement.class);
when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement);

when(cqlSession.execute(any(BatchStatement.class)))
.thenThrow(new InvalidQueryException(cassandraNode, "Batch too large"))
.thenReturn(null);

processor.process(List.of(event, event, event));
processor.process(List.of(event1, event2, event3));

// Verify that the batch was retried with smaller sizes after the initial failure
verify(cqlSession, times(2)).execute(any(BatchStatement.class));
verify(insertStatement, times(6)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024]));
verify(cqlSession, times(1)).execute(any(BoundStatement.class));
}

@Test
public void testProcessBatchTooLargeWithNineEvents() {
when(event.hasPersistentActorBytes()).thenReturn(true);
when(event.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(event.persistentActorId()).thenReturn("actorId");
when(event.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));
// Create nine separate events to verify they're all processed
PersistentActorUpdateEvent event1 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event2 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event3 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event4 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event5 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event6 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event7 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event8 = mock(PersistentActorUpdateEvent.class);
PersistentActorUpdateEvent event9 = mock(PersistentActorUpdateEvent.class);

for (PersistentActorUpdateEvent e : List.of(event1, event2, event3, event4, event5, event6, event7, event8, event9)) {
when(e.hasPersistentActorBytes()).thenReturn(true);
when(e.rowKey()).thenReturn(new String[]{"key1", "key2"});
when(e.persistentActorId()).thenReturn("actorId");
when(e.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024]));
}

BoundStatement boundStatement = mock(BoundStatement.class);
when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement);
Expand All @@ -193,11 +228,10 @@ public void testProcessBatchTooLargeWithNineEvents() {
.thenReturn(null)
.thenReturn(null);

processor.process(List.of(event, event, event, event, event, event, event, event, event));
processor.process(List.of(event1, event2, event3, event4, event5, event6, event7, event8, event9));

// Verify that the batch execution was attempted and retried with smaller batches
// The retry mechanism should split the batch and eventually succeed
verify(cqlSession, atLeast(2)).execute(any(BatchStatement.class));
// Verify that the batch was retried with smaller sizes
verify(cqlSession, times(3)).execute(any(BatchStatement.class));
}

}
Expand Down