in client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java [1628:1674]
public void testStreamCdcSegments() throws InterruptedException
{
MockResponse response = new MockResponse();
// mock reading the first 12 bytes, i.e. "Test Content" from a large blob (1024).
response.setResponseCode(200)
.setHeader(HttpHeaderNames.CONTENT_TYPE.toString(),
HttpHeaderValues.APPLICATION_OCTET_STREAM)
.setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes")
.setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes 0-11/1024")
.setBody("Test Content");
enqueue(response);
SidecarInstance instance = instances.get(0);
CountDownLatch latch = new CountDownLatch(1);
List<byte[]> receivedBytes = new ArrayList<>();
StreamConsumer mockStreamConsumer = new StreamConsumer()
{
@Override
public void onRead(StreamBuffer buffer)
{
assertThat(buffer.readableBytes()).isGreaterThan(0);
byte[] dst = new byte[buffer.readableBytes()];
buffer.copyBytes(0, dst, 0, buffer.readableBytes());
receivedBytes.add(dst);
}
@Override
public void onComplete()
{
latch.countDown();
}
@Override
public void onError(Throwable throwable)
{
latch.countDown();
}
};
client.streamCdcSegments(instance, "testSegment", HttpRange.of(0, 11), mockStreamConsumer);
latch.await();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (byte[] bytes : receivedBytes)
{
baos.write(bytes, 0, bytes.length);
}
assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content");
}