public void testStreamCdcSegments()

in client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java [1628:1674]


    public void testStreamCdcSegments() throws InterruptedException
    {
        MockResponse response = new MockResponse();
        // mock reading the first 12 bytes, i.e. "Test Content" from a large blob (1024).
        response.setResponseCode(200)
                .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(),
                           HttpHeaderValues.APPLICATION_OCTET_STREAM)
                .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes")
                .setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes 0-11/1024")
                .setBody("Test Content");
        enqueue(response);

        SidecarInstance instance = instances.get(0);
        CountDownLatch latch = new CountDownLatch(1);
        List<byte[]> receivedBytes = new ArrayList<>();
        StreamConsumer mockStreamConsumer = new StreamConsumer()
        {
            @Override
            public void onRead(StreamBuffer buffer)
            {
                assertThat(buffer.readableBytes()).isGreaterThan(0);
                byte[] dst = new byte[buffer.readableBytes()];
                buffer.copyBytes(0, dst, 0, buffer.readableBytes());
                receivedBytes.add(dst);
            }

            @Override
            public void onComplete()
            {
                latch.countDown();
            }

            @Override
            public void onError(Throwable throwable)
            {
                latch.countDown();
            }
        };
        client.streamCdcSegments(instance, "testSegment", HttpRange.of(0, 11), mockStreamConsumer);
        latch.await();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        for (byte[] bytes : receivedBytes)
        {
            baos.write(bytes, 0, bytes.length);
        }
        assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content");
    }