public StreamObserver call()

in oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java [100:170]


    public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
        if (Objects.isNull(workerInstanceGetter)) {
            synchronized (RemoteServiceHandler.class) {
                if (Objects.isNull(workerInstanceGetter)) {
                    workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME)
                                                             .provider()
                                                             .getService(IWorkerInstanceGetter.class);
                }
            }
        }

        return new StreamObserver<RemoteMessage>() {
            @Override
            public void onNext(RemoteMessage message) {
                remoteInCounter.inc();
                HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
                try {
                    String nextWorkerName = message.getNextWorkerName();
                    RemoteData remoteData = message.getRemoteData();

                    RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
                    if (handleWorker != null) {
                        AbstractWorker nextWorker = handleWorker.getWorker();
                        StreamData streamData;
                        try {
                            streamData = handleWorker.getStreamDataClass().newInstance();
                        } catch (Throwable t) {
                            remoteInErrorCounter.inc();
                            LOGGER.error(t.getMessage(), t);
                            return;
                        }
                        try {
                            streamData.deserialize(remoteData);
                        } catch (Throwable t) {
                            remoteInErrorCounter.inc();
                            LOGGER.error("Can't deserialize data {}, this data is discarded.", message, t);
                            return;
                        }
                        nextWorker.in(streamData);
                    } else {
                        remoteInTargetNotFoundCounter.inc();
                        LOGGER.warn(
                            "Data is discarded due to worker not found. Check OAL/MAL script, make sure they are aligned in the whole cluster. The data is {}",
                            message
                        );
                    }

                } finally {
                    timer.finish();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                Status status = Status.fromThrowable(throwable);
                if (Status.CANCELLED.getCode() == status.getCode()) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(throwable.getMessage(), throwable);
                    }
                    return;
                }
                LOGGER.error(throwable.getMessage(), throwable);
            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(Empty.newBuilder().build());
                responseObserver.onCompleted();
            }
        };
    }