public void testPipelineEndToEnd()

in e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndServiceMapTest.java [63:123]


    public void testPipelineEndToEnd() throws IOException, InterruptedException {
        // Send test trace group 1
        final ExportTraceServiceRequest exportTraceServiceRequest11 = getExportTraceServiceRequest(
                getResourceSpansBatch(TEST_TRACEID_1, TEST_TRACE_1_BATCH_1)
        );
        final ExportTraceServiceRequest exportTraceServiceRequest12 = getExportTraceServiceRequest(
                getResourceSpansBatch(TEST_TRACEID_1, TEST_TRACE_1_BATCH_2)
        );

        sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest11);
        sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest12);

        //Verify data in OpenSearch backend
        final List<EndToEndTestSpan> testDataSet1 = Stream.of(TEST_TRACE_1_BATCH_1, TEST_TRACE_1_BATCH_2)
                .flatMap(Collection::stream).collect(Collectors.toList());
        final List<Map<String, Object>> possibleEdges = getPossibleEdges(TEST_TRACEID_1, testDataSet1);
        final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(
                Collections.singletonList("https://127.0.0.1:9200"));
        builder.withUsername("admin");
        builder.withPassword("admin");
        final RestHighLevelClient restHighLevelClient = builder.build().createClient();

        // Wait for service map prepper by 2 * window_duration
        Thread.sleep(6000);
        await().atMost(20, TimeUnit.SECONDS).untilAsserted(
                () -> {
                    final List<Map<String, Object>> foundSources = getSourcesFromIndex(restHighLevelClient, SERVICE_MAP_INDEX_NAME);
                    foundSources.forEach(source -> source.remove("hashId"));
                    Assert.assertEquals(8, foundSources.size());
                    Assert.assertTrue(foundSources.containsAll(possibleEdges) && possibleEdges.containsAll(foundSources));
                }
        );

        // Resend the same batch of spans (No new edges should be created)
        sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest11);
        sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest12);
        // Send test trace group 2
        final ExportTraceServiceRequest exportTraceServiceRequest21 = getExportTraceServiceRequest(
                getResourceSpansBatch(TEST_TRACEID_2, TEST_TRACE_2_BATCH_1)
        );
        final ExportTraceServiceRequest exportTraceServiceRequest22 = getExportTraceServiceRequest(
                getResourceSpansBatch(TEST_TRACEID_2, TEST_TRACE_2_BATCH_2)
        );

        sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest21);
        sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest22);

        final List<EndToEndTestSpan> testDataSet2 = Stream.of(TEST_TRACE_2_BATCH_1, TEST_TRACE_2_BATCH_2)
                .flatMap(Collection::stream).collect(Collectors.toList());
        possibleEdges.addAll(getPossibleEdges(TEST_TRACEID_2, testDataSet2));
        // Wait for service map prepper by 2 * window_duration
        Thread.sleep(6000);
        await().atMost(20, TimeUnit.SECONDS).untilAsserted(
                () -> {
                    final List<Map<String, Object>> foundSources = getSourcesFromIndex(restHighLevelClient, SERVICE_MAP_INDEX_NAME);
                    foundSources.forEach(source -> source.remove("hashId"));
                    Assert.assertEquals(12, foundSources.size());
                    Assert.assertTrue(foundSources.containsAll(possibleEdges) && possibleEdges.containsAll(foundSources));
                }
        );
    }