private MessageEventCallback getMessageCallback()

in gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java [222:323]


  private MessageEventCallback getMessageCallback() {
    return new MessageEventCallback() {

      @Override
      public void doCallback(String message) {
        /* do nothing */
      }

      @Override
      public void onConnectionOpen(Object session) {
        /* do nothing */
      }

      @Override
      public void onConnectionClose(final CloseReason reason) {
        try {
          frontendSession.close(reason.getCloseCode().getCode(),
              reason.getReasonPhrase());
        } finally {
          cleanup();
        }
      }

      @Override
      public void onError(Throwable cause) {
        cleanupOnError(cause);
      }

      @Override
      public void onMessageText(String message, Object session) {
        LOG.logMessage("[From Backend <---]" + message);
        remoteLock.lock();
        final RemoteEndpoint remote = getRemote();
        try {
          if (remote == null) {
            LOG.debugLog("Remote endpoint is null");
            if (messageBuffer.size() >= config.getWebsocketMaxWaitBufferCount()) {
              throw new RuntimeIOException("Remote is null and message buffer is full. Cannot buffer anymore ");
            }
            LOG.debugLog("Buffering message: " + message);
            messageBuffer.add(message);
            return;
          }

          /* Proxy message to frontend */
          flushBufferedMessages(remote);

          LOG.debugLog("Sending current message [From Backend <---]: " + message);
          remote.sendString(message);
          if (remote.getBatchMode() == BatchMode.ON) {
            remote.flush();
          }
        } catch (IOException e) {
          LOG.connectionFailed(e);
          throw new RuntimeIOException(e);
        }
        finally
        {
          remoteLock.unlock();
        }
      }

      @Override
      public void onMessageBinary(byte[] message, boolean last,
          Object session) {
        throw new UnsupportedOperationException(
            "Websocket support for binary messages is not supported at this time.");

      }

      @Override
      public void onMessagePong(javax.websocket.PongMessage message, Object session) {
        LOG.logMessage("[From Backend <---]: PING");
        remoteLock.lock();
        final RemoteEndpoint remote = getRemote();
        try {
          if (remote == null) {
            LOG.debugLog("Remote endpoint is null");
            return;
          }

          /* Proxy Ping message to frontend */
          flushBufferedMessages(remote);

          LOG.logMessage("Sending current PING [From Backend <---]: ");
          remote.sendPing(message.getApplicationData());
          if (remote.getBatchMode() == BatchMode.ON) {
            remote.flush();
          }
        } catch (IOException e) {
          LOG.connectionFailed(e);
          throw new RuntimeIOException(e);
        }
        finally
        {
          remoteLock.unlock();
        }
      }

    };

  }