in e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndServiceMapTest.java [63:123]
public void testPipelineEndToEnd() throws IOException, InterruptedException {
// Send test trace group 1
final ExportTraceServiceRequest exportTraceServiceRequest11 = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_TRACEID_1, TEST_TRACE_1_BATCH_1)
);
final ExportTraceServiceRequest exportTraceServiceRequest12 = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_TRACEID_1, TEST_TRACE_1_BATCH_2)
);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest11);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest12);
//Verify data in OpenSearch backend
final List<EndToEndTestSpan> testDataSet1 = Stream.of(TEST_TRACE_1_BATCH_1, TEST_TRACE_1_BATCH_2)
.flatMap(Collection::stream).collect(Collectors.toList());
final List<Map<String, Object>> possibleEdges = getPossibleEdges(TEST_TRACEID_1, testDataSet1);
final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(
Collections.singletonList("https://127.0.0.1:9200"));
builder.withUsername("admin");
builder.withPassword("admin");
final RestHighLevelClient restHighLevelClient = builder.build().createClient();
// Wait for service map prepper by 2 * window_duration
Thread.sleep(6000);
await().atMost(20, TimeUnit.SECONDS).untilAsserted(
() -> {
final List<Map<String, Object>> foundSources = getSourcesFromIndex(restHighLevelClient, SERVICE_MAP_INDEX_NAME);
foundSources.forEach(source -> source.remove("hashId"));
Assert.assertEquals(8, foundSources.size());
Assert.assertTrue(foundSources.containsAll(possibleEdges) && possibleEdges.containsAll(foundSources));
}
);
// Resend the same batch of spans (No new edges should be created)
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest11);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest12);
// Send test trace group 2
final ExportTraceServiceRequest exportTraceServiceRequest21 = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_TRACEID_2, TEST_TRACE_2_BATCH_1)
);
final ExportTraceServiceRequest exportTraceServiceRequest22 = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_TRACEID_2, TEST_TRACE_2_BATCH_2)
);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequest21);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequest22);
final List<EndToEndTestSpan> testDataSet2 = Stream.of(TEST_TRACE_2_BATCH_1, TEST_TRACE_2_BATCH_2)
.flatMap(Collection::stream).collect(Collectors.toList());
possibleEdges.addAll(getPossibleEdges(TEST_TRACEID_2, testDataSet2));
// Wait for service map prepper by 2 * window_duration
Thread.sleep(6000);
await().atMost(20, TimeUnit.SECONDS).untilAsserted(
() -> {
final List<Map<String, Object>> foundSources = getSourcesFromIndex(restHighLevelClient, SERVICE_MAP_INDEX_NAME);
foundSources.forEach(source -> source.remove("hashId"));
Assert.assertEquals(12, foundSources.size());
Assert.assertTrue(foundSources.containsAll(possibleEdges) && possibleEdges.containsAll(foundSources));
}
);
}