in apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/asyncprofiler/AsyncProfilerDataSender.java [90:155]
public void sendData(AsyncProfilerTask task, File dumpFile) throws IOException, InterruptedException {
if (status != GRPCChannelStatus.CONNECTED) {
return;
}
try (FileInputStream fileInputStream = new FileInputStream(dumpFile)) {
long fileSize = Files.size(dumpFile.toPath());
int size = Math.toIntExact(fileSize);
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<AsyncProfilerData> dataStreamObserver = asyncProfilerTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new ClientResponseObserver<AsyncProfilerData, AsyncProfilerCollectionResponse>() {
ClientCallStreamObserver<AsyncProfilerData> requestStream;
final byte[] buf = new byte[DATA_CHUNK_SIZE];
@Override
public void beforeStart(ClientCallStreamObserver<AsyncProfilerData> requestStream) {
this.requestStream = requestStream;
}
@Override
public void onNext(AsyncProfilerCollectionResponse value) {
if (AsyncProfilingStatus.TERMINATED_BY_OVERSIZE.equals(value.getType())) {
LOGGER.warn("JFR is too large to be received by the oap server");
} else {
try {
int bytesRead;
while ((bytesRead = fileInputStream.read(buf)) != -1) {
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder()
.setContent(ByteString.copyFrom(buf, 0, bytesRead))
.build();
requestStream.onNext(asyncProfilerData);
}
} catch (IOException e) {
LOGGER.error("Failed to read JFR file and failed to upload to oap", e);
}
}
requestStream.onCompleted();
}
@Override
public void onError(Throwable t) {
status.finished();
LOGGER.error(t, "Send async profiler task data to collector fail with a grpc internal exception.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
@Override
public void onCompleted() {
status.finished();
}
});
AsyncProfilerMetaData metaData = AsyncProfilerMetaData.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.setType(AsyncProfilingStatus.PROFILING_SUCCESS)
.setContentSize(size)
.setTaskId(task.getTaskId())
.build();
AsyncProfilerData asyncProfilerData = AsyncProfilerData.newBuilder().setMetaData(metaData).build();
dataStreamObserver.onNext(asyncProfilerData);
status.wait4Finish();
}
}