void testStreamSSTableComponentWithRetries()

in client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java [1228:1316]


    void testStreamSSTableComponentWithRetries(boolean useLegacyApi) throws Exception
    {
        try (MockWebServer server = new MockWebServer())
        {
            CountDownLatch latch = new CountDownLatch(1);
            List<byte[]> receivedBytes = new ArrayList<>();
            StreamConsumer mockStreamConsumer = new StreamConsumer()
            {
                @Override
                public void onRead(StreamBuffer buffer)
                {
                    assertThat(buffer.readableBytes()).isGreaterThan(0);
                    byte[] dst = new byte[buffer.readableBytes()];
                    buffer.copyBytes(0, dst, 0, buffer.readableBytes());
                    receivedBytes.add(dst);
                }

                @Override
                public void onComplete()
                {
                    latch.countDown();
                }

                @Override
                public void onError(Throwable throwable)
                {
                }
            };

            SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server);
            MockResponse response =
            new MockResponse().setResponseCode(PARTIAL_CONTENT.code())
                              .setHeader(HttpHeaderNames.CONTENT_TYPE.toString(),
                                         HttpHeaderValues.APPLICATION_OCTET_STREAM)
                              .setHeader(HttpHeaderNames.ACCEPT_RANGES.toString(), "bytes")
                              .setHeader(HttpHeaderNames.CONTENT_RANGE.toString(), "bytes 10-20/80")
                              .setBody("TOC.txt\nSt");
            server.enqueue(new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code())
                                             .setBody("{\"error\":\"some error\"}"));
            server.enqueue(new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code())
                                             .setBody("{\"error\":\"some error\"}"));
            server.enqueue(response);

            String expectedPath;
            if (useLegacyApi)
            {
                client.streamSSTableComponent(sidecarInstance,
                                              "cycling",
                                              "cyclist_name",
                                              "2023.04.12",
                                              "nb-1-big-TOC.txt",
                                              HttpRange.of(10, 20),
                                              mockStreamConsumer);
                expectedPath = ApiEndpointsV1.COMPONENTS_ROUTE
                               .replaceAll(KEYSPACE_PATH_PARAM, "cycling")
                               .replaceAll(ApiEndpointsV1.TABLE_PATH_PARAM, "cyclist_name")
                               .replaceAll(ApiEndpointsV1.SNAPSHOT_PATH_PARAM, "2023.04.12")
                               .replaceAll(ApiEndpointsV1.COMPONENT_PATH_PARAM, "nb-1-big-TOC.txt");
            }
            else
            {
                ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(2023,
                                                                                                     server.getHostName(),
                                                                                                     server.getPort(), 0,
                                                                                                     "2023.04.12",
                                                                                                     "cycling",
                                                                                                     "cyclist_name-1234",
                                                                                                     "nb-1-big-TOC.txt");
                client.streamSSTableComponent(sidecarInstance, fileInfo, HttpRange.of(10, 20), mockStreamConsumer);
                expectedPath = fileInfo.componentDownloadUrl();
            }
            assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();

            server.takeRequest(); // first 500
            server.takeRequest(); // second 500
            RecordedRequest request3 = server.takeRequest();
            assertThat(request3.getPath()).isEqualTo(expectedPath);
            assertThat(request3.getHeader("User-Agent")).isEqualTo("cassandra-sidecar-test/0.0.1");
            assertThat(request3.getHeader("range")).isEqualTo("bytes=10-20");

            byte[] bytes = receivedBytes.stream()
                                        .collect(ByteArrayOutputStream::new,
                                                 (outputStream, src) -> outputStream.write(src, 0, src.length),
                                                 (outputStream, src) -> {
                                                 })
                                        .toByteArray();
            assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("TOC.txt\nSt");
        }
    }