in src/main/java/com/microsoft/azure/proton/transport/ws/impl/WebSocketImpl.java [298:413]
private void processInput() throws TransportException {
switch (webSocketState) {
case PN_WS_CONNECTING:
inputBuffer.mark();
if (webSocketHandler.validateUpgradeReply(inputBuffer)) {
webSocketState = WebSocketState.PN_WS_CONNECTED_FLOW;
} else {
// Input data was incomplete. Reset buffer position and wait for another call after more data arrives.
inputBuffer.reset();
TRACE_LOGGER.warn("Websocket connecting response incomplete");
}
inputBuffer.compact();
break;
case PN_WS_CONNECTED_FLOW:
case PN_WS_CONNECTED_PONG:
if (inputBuffer.remaining() > 0) {
boolean readComplete = false;
while (!readComplete) {
switch (frameReadState) {
//State 1: Init_Read
case INIT_READ:
//Reset the bytes read count
bytesRead = 0;
//Determine how much to grab from the input buffer and only take that
readInputBuffer();
frameReadState = tempBuffer.position() < 2
? WebSocketFrameReadState.CHUNK_READ
: WebSocketFrameReadState.HEADER_READ;
readComplete = frameReadState == WebSocketFrameReadState.CHUNK_READ;
break;
//State 2: Chunk_Read
case CHUNK_READ:
//Determine how much to grab from the input buffer and only take that
readInputBuffer();
frameReadState = tempBuffer.position() < 2 ? frameReadState : WebSocketFrameReadState.HEADER_READ;
readComplete = frameReadState == WebSocketFrameReadState.CHUNK_READ;
break;
//State 3: Header_Read
case HEADER_READ:
//Determine how much to grab from the input buffer and only take that
readInputBuffer();
tempBuffer.flip();
WebSocketHandler.WebsocketTuple unwrapResult = unwrapBuffer(tempBuffer);
lastType = unwrapResult.getType();
lastLength = unwrapResult.getLength();
frameReadState = lastType == WEB_SOCKET_MESSAGE_TYPE_HEADER_CHUNK
? WebSocketFrameReadState.CHUNK_READ
: WebSocketFrameReadState.CONTINUED_FRAME_READ;
readComplete = frameReadState == WebSocketFrameReadState.CHUNK_READ
|| tempBuffer.position() == tempBuffer.limit();
if (frameReadState == WebSocketFrameReadState.CONTINUED_FRAME_READ) {
tempBuffer.compact();
} else {
//Unflip the buffer to continue writing to it
tempBuffer.position(tempBuffer.limit());
tempBuffer.limit(tempBuffer.capacity());
}
break;
//State 4: Continued_Frame_Read (Similar to Chunk_Read but reading until
// we've read the number of bytes specified when unwrapping the buffer)
case CONTINUED_FRAME_READ:
//Determine how much to grab from the input buffer and only take that
readInputBuffer();
tempBuffer.flip();
final byte[] data;
if (tempBuffer.remaining() >= lastLength - bytesRead) {
data = new byte[(int) (lastLength - bytesRead)];
tempBuffer.get(data, 0, (int) (lastLength - bytesRead));
wsInputBuffer.put(data);
bytesRead += lastLength - bytesRead;
} else {
//Otherwise the remaining bytes is < the rest that we need
data = new byte[tempBuffer.remaining()];
tempBuffer.get(data);
wsInputBuffer.put(data);
bytesRead += data.length;
}
//Send whatever we have
sendToUnderlyingInput();
frameReadState = bytesRead
== lastLength ? WebSocketFrameReadState.INIT_READ : WebSocketFrameReadState.CONTINUED_FRAME_READ;
readComplete = tempBuffer.remaining() == 0;
tempBuffer.compact();
break;
//State 5: Read_Error
case READ_ERROR:
break;
default:
assert false : String.format("unexpected value for WebSocketFrameReadState: %s", frameReadState);
}
}
}
inputBuffer.compact();
break;
case PN_WS_NOT_STARTED:
case PN_WS_CLOSED:
case PN_WS_FAILED:
default:
break;
}
}