diff --git a/main/backplane-cassandra4/src/test/java/org/elasticsoftware/elasticactors/cassandra4/state/PersistentActorUpdateEventProcessorTest.java b/main/backplane-cassandra4/src/test/java/org/elasticsoftware/elasticactors/cassandra4/state/PersistentActorUpdateEventProcessorTest.java index bd05ddc5..15da6e4e 100644 --- a/main/backplane-cassandra4/src/test/java/org/elasticsoftware/elasticactors/cassandra4/state/PersistentActorUpdateEventProcessorTest.java +++ b/main/backplane-cassandra4/src/test/java/org/elasticsoftware/elasticactors/cassandra4/state/PersistentActorUpdateEventProcessorTest.java @@ -135,55 +135,90 @@ public void testProcessEmptyEvents() { @Test public void testProcessBatchTooLargeWithTwoEvents() { - when(event.hasPersistentActorBytes()).thenReturn(true); - when(event.rowKey()).thenReturn(new String[]{"key1", "key2"}); - when(event.persistentActorId()).thenReturn("actorId"); - when(event.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + // Create two separate events to verify they're both processed + PersistentActorUpdateEvent event1 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event2 = mock(PersistentActorUpdateEvent.class); - BoundStatement boundStatement1 = mock(BoundStatement.class); - BoundStatement boundStatement2 = mock(BoundStatement.class); - when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement1).thenReturn(boundStatement2); + when(event1.hasPersistentActorBytes()).thenReturn(true); + when(event1.rowKey()).thenReturn(new String[]{"key1", "key2"}); + when(event1.persistentActorId()).thenReturn("actorId"); + when(event1.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + + when(event2.hasPersistentActorBytes()).thenReturn(true); + when(event2.rowKey()).thenReturn(new String[]{"key1", "key2"}); + when(event2.persistentActorId()).thenReturn("actorId"); + when(event2.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + + BoundStatement boundStatement = mock(BoundStatement.class); + when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement); doThrow(new InvalidQueryException(cassandraNode, "Batch too large")) .when(cqlSession) .execute(any(BatchStatement.class)); - processor.process(List.of(event, event)); + processor.process(List.of(event1, event2)); + // Verify that after batch fails, events are processed individually verify(cqlSession, times(1)).execute(any(BatchStatement.class)); - verify(insertStatement, times(4)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024])); verify(cqlSession, times(2)).execute(any(BoundStatement.class)); } @Test public void testProcessBatchTooLargeWithThreeEvents() { - when(event.hasPersistentActorBytes()).thenReturn(true); - when(event.rowKey()).thenReturn(new String[]{"key1", "key2"}); - when(event.persistentActorId()).thenReturn("actorId"); - when(event.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + // Create three separate events with listeners to verify they're all processed + PersistentActorUpdateEvent event1 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event2 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event3 = mock(PersistentActorUpdateEvent.class); + + when(event1.hasPersistentActorBytes()).thenReturn(true); + when(event1.rowKey()).thenReturn(new String[]{"key1", "key2"}); + when(event1.persistentActorId()).thenReturn("actorId"); + when(event1.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + + when(event2.hasPersistentActorBytes()).thenReturn(true); + when(event2.rowKey()).thenReturn(new String[]{"key1", "key2"}); + when(event2.persistentActorId()).thenReturn("actorId"); + when(event2.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + + when(event3.hasPersistentActorBytes()).thenReturn(true); + when(event3.rowKey()).thenReturn(new String[]{"key1", "key2"}); + when(event3.persistentActorId()).thenReturn("actorId"); + when(event3.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); - BoundStatement boundStatement1 = mock(BoundStatement.class); - BoundStatement boundStatement2 = mock(BoundStatement.class); - when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement1).thenReturn(boundStatement2); + BoundStatement boundStatement = mock(BoundStatement.class); + when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement); when(cqlSession.execute(any(BatchStatement.class))) .thenThrow(new InvalidQueryException(cassandraNode, "Batch too large")) .thenReturn(null); - processor.process(List.of(event, event, event)); + processor.process(List.of(event1, event2, event3)); + // Verify that the batch was retried with smaller sizes after the initial failure verify(cqlSession, times(2)).execute(any(BatchStatement.class)); - verify(insertStatement, times(6)).bind("key1", "key2", "actorId", ByteBuffer.wrap(new byte[1024])); verify(cqlSession, times(1)).execute(any(BoundStatement.class)); } @Test public void testProcessBatchTooLargeWithNineEvents() { - when(event.hasPersistentActorBytes()).thenReturn(true); - when(event.rowKey()).thenReturn(new String[]{"key1", "key2"}); - when(event.persistentActorId()).thenReturn("actorId"); - when(event.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + // Create nine separate events to verify they're all processed + PersistentActorUpdateEvent event1 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event2 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event3 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event4 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event5 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event6 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event7 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event8 = mock(PersistentActorUpdateEvent.class); + PersistentActorUpdateEvent event9 = mock(PersistentActorUpdateEvent.class); + + for (PersistentActorUpdateEvent e : List.of(event1, event2, event3, event4, event5, event6, event7, event8, event9)) { + when(e.hasPersistentActorBytes()).thenReturn(true); + when(e.rowKey()).thenReturn(new String[]{"key1", "key2"}); + when(e.persistentActorId()).thenReturn("actorId"); + when(e.persistentActorBytes()).thenReturn(ByteBuffer.wrap(new byte[1024])); + } BoundStatement boundStatement = mock(BoundStatement.class); when(insertStatement.bind(any(), any(), any(), any())).thenReturn(boundStatement); @@ -193,11 +228,10 @@ public void testProcessBatchTooLargeWithNineEvents() { .thenReturn(null) .thenReturn(null); - processor.process(List.of(event, event, event, event, event, event, event, event, event)); + processor.process(List.of(event1, event2, event3, event4, event5, event6, event7, event8, event9)); - // Verify that the batch execution was attempted and retried with smaller batches - // The retry mechanism should split the batch and eventually succeed - verify(cqlSession, atLeast(2)).execute(any(BatchStatement.class)); + // Verify that the batch was retried with smaller sizes + verify(cqlSession, times(3)).execute(any(BatchStatement.class)); } }