in oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java [100:170]
public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(workerInstanceGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(workerInstanceGetter)) {
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(IWorkerInstanceGetter.class);
}
}
}
return new StreamObserver<RemoteMessage>() {
@Override
public void onNext(RemoteMessage message) {
remoteInCounter.inc();
HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
try {
String nextWorkerName = message.getNextWorkerName();
RemoteData remoteData = message.getRemoteData();
RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
if (handleWorker != null) {
AbstractWorker nextWorker = handleWorker.getWorker();
StreamData streamData;
try {
streamData = handleWorker.getStreamDataClass().newInstance();
} catch (Throwable t) {
remoteInErrorCounter.inc();
LOGGER.error(t.getMessage(), t);
return;
}
try {
streamData.deserialize(remoteData);
} catch (Throwable t) {
remoteInErrorCounter.inc();
LOGGER.error("Can't deserialize data {}, this data is discarded.", message, t);
return;
}
nextWorker.in(streamData);
} else {
remoteInTargetNotFoundCounter.inc();
LOGGER.warn(
"Data is discarded due to worker not found. Check OAL/MAL script, make sure they are aligned in the whole cluster. The data is {}",
message
);
}
} finally {
timer.finish();
}
}
@Override
public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
if (Status.CANCELLED.getCode() == status.getCode()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(throwable.getMessage(), throwable);
}
return;
}
LOGGER.error(throwable.getMessage(), throwable);
}
@Override
public void onCompleted() {
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
}
};
}