in gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java [222:323]
private MessageEventCallback getMessageCallback() {
return new MessageEventCallback() {
@Override
public void doCallback(String message) {
/* do nothing */
}
@Override
public void onConnectionOpen(Object session) {
/* do nothing */
}
@Override
public void onConnectionClose(final CloseReason reason) {
try {
frontendSession.close(reason.getCloseCode().getCode(),
reason.getReasonPhrase());
} finally {
cleanup();
}
}
@Override
public void onError(Throwable cause) {
cleanupOnError(cause);
}
@Override
public void onMessageText(String message, Object session) {
LOG.logMessage("[From Backend <---]" + message);
remoteLock.lock();
final RemoteEndpoint remote = getRemote();
try {
if (remote == null) {
LOG.debugLog("Remote endpoint is null");
if (messageBuffer.size() >= config.getWebsocketMaxWaitBufferCount()) {
throw new RuntimeIOException("Remote is null and message buffer is full. Cannot buffer anymore ");
}
LOG.debugLog("Buffering message: " + message);
messageBuffer.add(message);
return;
}
/* Proxy message to frontend */
flushBufferedMessages(remote);
LOG.debugLog("Sending current message [From Backend <---]: " + message);
remote.sendString(message);
if (remote.getBatchMode() == BatchMode.ON) {
remote.flush();
}
} catch (IOException e) {
LOG.connectionFailed(e);
throw new RuntimeIOException(e);
}
finally
{
remoteLock.unlock();
}
}
@Override
public void onMessageBinary(byte[] message, boolean last,
Object session) {
throw new UnsupportedOperationException(
"Websocket support for binary messages is not supported at this time.");
}
@Override
public void onMessagePong(javax.websocket.PongMessage message, Object session) {
LOG.logMessage("[From Backend <---]: PING");
remoteLock.lock();
final RemoteEndpoint remote = getRemote();
try {
if (remote == null) {
LOG.debugLog("Remote endpoint is null");
return;
}
/* Proxy Ping message to frontend */
flushBufferedMessages(remote);
LOG.logMessage("Sending current PING [From Backend <---]: ");
remote.sendPing(message.getApplicationData());
if (remote.getBatchMode() == BatchMode.ON) {
remote.flush();
}
} catch (IOException e) {
LOG.connectionFailed(e);
throw new RuntimeIOException(e);
}
finally
{
remoteLock.unlock();
}
}
};
}