Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.3.0
-----
* Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption (CASSANALYTICS-116)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the description is no longer accurate. Please update to reflect the actual fix.

* Fix race condition in DirectStreamSession#onSSTablesProduced and SortedSStableWriter#close (CASSANALYTICS-107)
* Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
* Add TimeRangeFilter to filter out SSTables outside given time window (CASSANALYTICS-102)
Expand Down
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid the changes in the analytics-sidecar-client and the other sidecar-copied subprojects at the best. Those files are supposed to be in sync with Sidecar.
I would revert the changes in this file and the test file. The VertxStreamBuffer and its test in another package.

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
public interface StreamBuffer
{
/**
* Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer destination}
* Copies bytes from this {@link StreamBuffer} into the {@link ByteBuffer destination}.
* <p>
* This method writes {@code length} bytes starting at the destination buffer's current position
* and advances the position by {@code length}. The caller is responsible for calling
* {@link ByteBuffer#flip()} on the destination buffer before reading from it.
*
Comment on lines +30 to 35
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the docs. Let's actually add the docs in org.apache.cassandra.spark.utils.streaming.StreamBuffer instead, not this file.

* @param sourceOffset the offset within the {@link StreamBuffer} to be read; must be non-negative and
* larger than the buffer length
* @param destination a {@link ByteBuffer} where the data will be copied
* @param destination a {@link ByteBuffer} where the data will be copied at its current position
* @param length the number of bytes to be copied from the {@link StreamBuffer}; must be non-negative and
* larger than the {@code buffer.length - sourceOffset}
*/
Expand Down Expand Up @@ -94,7 +98,6 @@ private ByteArrayWrapper(byte[] bytes)
public void copyBytes(int sourceOffset, ByteBuffer destination, int length)
{
destination.put(bytes, sourceOffset, length);
destination.flip();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ void testFullCopyBytesToByteBuffer()
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 11);
assertThat(destination.hasArray()).isTrue();
assertThat(destination.position()).isEqualTo(0);
assertThat(destination.limit()).isEqualTo(11);
assertThat(destination.position()).isEqualTo(11);
assertThat(destination.limit()).isEqualTo(20);
destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello World");
Expand All @@ -81,8 +82,9 @@ void testPartialCopyBytesToByteBuffer()
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 5);
assertThat(destination.hasArray()).isTrue();
assertThat(destination.position()).isEqualTo(0);
assertThat(destination.limit()).isEqualTo(5);
assertThat(destination.position()).isEqualTo(5);
assertThat(destination.limit()).isEqualTo(20);
destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
Expand All @@ -98,4 +100,48 @@ void testGetByte()
assertThat(streamBuffer.getByte(i)).isEqualTo((byte) inputString.charAt(i));
}
}

@Test
void testMultipleChunkCopyToSameBuffer()
{
// Simulates BufferingInputStream filling a buffer with multiple chunks
// This validates the fix for the flip() bug where chunks were overwriting instead of appending

// Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC", "DDDD"
String sourceData = "AAAABBBBCCCCDDDD";
StreamBuffer streamBuffer = StreamBuffer.wrap(sourceData.getBytes(StandardCharsets.UTF_8));

// Destination buffer to accumulate all chunks (16 bytes total)
ByteBuffer destination = ByteBuffer.allocate(16);

// Write chunks one at a time (simulating multi-chunk read)
streamBuffer.copyBytes(0, destination, 4); // Write "AAAA" at position 0
assertThat(destination.position()).isEqualTo(4); // Position should advance to 4

streamBuffer.copyBytes(4, destination, 4); // Write "BBBB" at position 4
assertThat(destination.position()).isEqualTo(8); // Position should advance to 8

streamBuffer.copyBytes(8, destination, 4); // Write "CCCC" at position 8
assertThat(destination.position()).isEqualTo(12); // Position should advance to 12

streamBuffer.copyBytes(12, destination, 4); // Write "DDDD" at position 12
assertThat(destination.position()).isEqualTo(16); // Position should advance to 16

// Buffer should be completely filled
assertThat(destination.remaining()).isEqualTo(0);

// Flip to read mode and verify all chunks are present (not overwritten)
destination.flip();
byte[] result = new byte[16];
destination.get(result);

String resultString = new String(result, StandardCharsets.UTF_8);
assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");

// Verify each chunk individually
assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public VertxStreamBuffer(Buffer buffer)
public void copyBytes(int sourceOffset, ByteBuffer destination, int length)
{
destination.put(buffer.getBytes(sourceOffset, sourceOffset + length));
destination.flip();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ void testFullCopyBytesToByteBuffer()
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 11);
assertThat(destination.hasArray()).isTrue();
assertThat(destination.position()).isEqualTo(0);
assertThat(destination.limit()).isEqualTo(11);
assertThat(destination.position()).isEqualTo(11);
assertThat(destination.limit()).isEqualTo(20);
destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello World");
Expand All @@ -56,8 +57,9 @@ void testPartialCopyBytesToByteBuffer()
ByteBuffer destination = ByteBuffer.allocate(20);
streamBuffer.copyBytes(0, destination, 5);
assertThat(destination.hasArray()).isTrue();
assertThat(destination.position()).isEqualTo(0);
assertThat(destination.limit()).isEqualTo(5);
assertThat(destination.position()).isEqualTo(5);
assertThat(destination.limit()).isEqualTo(20);
destination.flip();
byte[] dst = new byte[destination.limit()];
destination.get(dst, 0, dst.length);
assertThat(new String(dst, StandardCharsets.UTF_8)).isEqualTo("Hello");
Expand Down Expand Up @@ -93,4 +95,48 @@ void testReadableBytes()
assertThat(streamBuffer.readableBytes()).isEqualTo(8);
streamBuffer.release(); // does nothing
}

@Test
void testMultipleChunkCopyToSameBuffer()
{
// Simulates BufferingInputStream filling a buffer with multiple chunks
// This validates the fix for the flip() bug where chunks were overwriting instead of appending

// Create source data with 4 distinct chunks: "AAAA", "BBBB", "CCCC", "DDDD"
Buffer buffer = new BufferImpl().appendString("AAAABBBBCCCCDDDD");
StreamBuffer streamBuffer = new VertxStreamBuffer(buffer);

// Destination buffer to accumulate all chunks (16 bytes total)
ByteBuffer destination = ByteBuffer.allocate(16);

// Write chunks one at a time (simulating multi-chunk read)
streamBuffer.copyBytes(0, destination, 4); // Write "AAAA" at position 0
assertThat(destination.position()).isEqualTo(4); // Position should advance to 4

streamBuffer.copyBytes(4, destination, 4); // Write "BBBB" at position 4
assertThat(destination.position()).isEqualTo(8); // Position should advance to 8

streamBuffer.copyBytes(8, destination, 4); // Write "CCCC" at position 8
assertThat(destination.position()).isEqualTo(12); // Position should advance to 12

streamBuffer.copyBytes(12, destination, 4); // Write "DDDD" at position 12
assertThat(destination.position()).isEqualTo(16); // Position should advance to 16

// Buffer should be completely filled
assertThat(destination.remaining()).isEqualTo(0);

// Flip to read mode and verify all chunks are present (not overwritten)
destination.flip();
byte[] result = new byte[16];
destination.get(result);

String resultString = new String(result, StandardCharsets.UTF_8);
assertThat(resultString).isEqualTo("AAAABBBBCCCCDDDD");

// Verify each chunk individually
assertThat(resultString.substring(0, 4)).isEqualTo("AAAA");
assertThat(resultString.substring(4, 8)).isEqualTo("BBBB");
assertThat(resultString.substring(8, 12)).isEqualTo("CCCC");
assertThat(resultString.substring(12, 16)).isEqualTo("DDDD");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public BufferHolder rebuffer(long l)
BlockingStreamConsumer streamConsumer = new BlockingStreamConsumer();
source.request(offset, end, streamConsumer);
streamConsumer.getBytes(buffer);
buffer.flip();
return this;
}

Expand All @@ -127,6 +128,7 @@ public BufferHolder rebuffer(long l)

inputStream.read(buffer);
assert buffer.remaining() == 0;
buffer.flip();
}
catch (IOException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,12 @@ public void testCDCRebufferSequentialReading() throws IOException
assertThat(holder).isNotNull();
assertThat(rebuffer.offset()).isEqualTo(0L);

// Verify buffer was fully written (position at end, remaining = 0)
// Verify buffer is in read mode (flipped by rebuffer)
ByteBuffer buffer = holder.buffer();
assertThat(buffer.remaining()).isEqualTo(0);
assertThat(buffer.position()).isEqualTo(50);
assertThat(buffer.position()).isEqualTo(0);
assertThat(buffer.remaining()).isEqualTo(50);

// Flip to read mode and verify actual byte values
buffer.flip();
// Verify actual byte values (buffer already flipped, ready to read)
for (int i = 0; i < 50; i++)
{
assertThat(buffer.get()).isEqualTo((byte) i);
Expand All @@ -136,13 +135,12 @@ public void testCDCRebufferSequentialReading() throws IOException
assertThat(holder).isNotNull();
assertThat(rebuffer.offset()).isEqualTo(50L);

// Verify buffer was fully written (position at end, remaining = 0)
// Verify buffer is in read mode (flipped by rebuffer)
buffer = holder.buffer();
assertThat(buffer.remaining()).isEqualTo(0);
assertThat(buffer.position()).isEqualTo(50);
assertThat(buffer.position()).isEqualTo(0);
assertThat(buffer.remaining()).isEqualTo(50);

// Flip to read mode and verify actual byte values
buffer.flip();
// Verify actual byte values (buffer already flipped, ready to read)
for (int i = 0; i < 50; i++)
{
assertThat(buffer.get()).isEqualTo((byte) (50 + i));
Expand Down Expand Up @@ -195,13 +193,12 @@ public void testCDCRebufferBackwardSeek() throws IOException
assertThat(holder).isNotNull();
assertThat(rebuffer.offset()).isEqualTo(0L);

// Verify buffer is in write mode
// Verify buffer is in read mode (flipped by rebuffer)
ByteBuffer buffer = holder.buffer();
assertThat(buffer.remaining()).isEqualTo(0);
assertThat(buffer.position()).isEqualTo(50);
assertThat(buffer.position()).isEqualTo(0);
assertThat(buffer.remaining()).isEqualTo(50);

// TEST flips to verify byte values are correct
buffer.flip();
// Verify byte values are correct (buffer already flipped, ready to read)
for (int i = 0; i < 50; i++)
{
assertThat(buffer.get()).isEqualTo((byte) i);
Expand Down