in e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndRawSpanTest.java [86:143]
public void testPipelineEndToEnd() throws InterruptedException {
//Send data to otel trace source
final ExportTraceServiceRequest exportTraceServiceRequestTrace1BatchWithRoot = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_SPAN_SET_1_WITH_ROOT_SPAN)
);
final ExportTraceServiceRequest exportTraceServiceRequestTrace1BatchNoRoot = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_SPAN_SET_1_WITHOUT_ROOT_SPAN)
);
final ExportTraceServiceRequest exportTraceServiceRequestTrace2BatchWithRoot = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_SPAN_SET_2_WITH_ROOT_SPAN)
);
final ExportTraceServiceRequest exportTraceServiceRequestTrace2BatchNoRoot = getExportTraceServiceRequest(
getResourceSpansBatch(TEST_SPAN_SET_2_WITHOUT_ROOT_SPAN)
);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequestTrace1BatchWithRoot);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_1, exportTraceServiceRequestTrace2BatchNoRoot);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequestTrace2BatchWithRoot);
sendExportTraceServiceRequestToSource(DATA_PREPPER_PORT_2, exportTraceServiceRequestTrace1BatchNoRoot);
//Verify data in OpenSearch backend
final List<Map<String, Object>> expectedDocuments = getExpectedDocuments(
exportTraceServiceRequestTrace1BatchWithRoot, exportTraceServiceRequestTrace1BatchNoRoot,
exportTraceServiceRequestTrace2BatchWithRoot, exportTraceServiceRequestTrace2BatchNoRoot);
final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(
Collections.singletonList("https://127.0.0.1:9200"));
builder.withUsername("admin");
builder.withPassword("admin");
final RestHighLevelClient restHighLevelClient = builder.build().createClient();
// Wait for otel-trace-raw-prepper by at least trace_flush_interval
Thread.sleep(6000);
// Wait for data to flow through pipeline and be indexed by ES
await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
refreshIndices(restHighLevelClient);
final SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
searchRequest.source(
SearchSourceBuilder.searchSource().size(100)
);
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
final List<Map<String, Object>> foundSources = getSourcesFromSearchHits(searchResponse.getHits());
Assert.assertEquals(expectedDocuments.size(), foundSources.size());
/**
* Our raw trace prepper add more fields than the actual sent object. These are defaults from the proto.
* So assertion is done if all the expected fields exists.
*
* TODO: Can we do better?
*
*/
expectedDocuments.forEach(expectedDoc -> {
Assert.assertTrue(foundSources.stream()
.filter(i -> i.get("spanId").equals(expectedDoc.get("spanId")))
.findFirst().get()
.entrySet().containsAll(expectedDoc.entrySet()));
});
}
);
}