in httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java [728:1024]
private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
final FrameType frameType = FrameType.valueOf(frame.getType());
final int streamId = frame.getStreamId();
if (continuation != null && frameType != FrameType.CONTINUATION) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
}
switch (frameType) {
case DATA: {
final H2Stream stream = getValidStream(streamId);
try {
consumeDataFrame(frame, stream);
} catch (final H2StreamResetException ex) {
stream.localReset(ex);
} catch (final HttpStreamResetException ex) {
stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
}
if (stream.isTerminated()) {
streamMap.remove(streamId);
stream.releaseResources();
requestSessionOutput();
}
}
break;
case HEADERS: {
if (streamId == 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
}
H2Stream stream = streamMap.get(streamId);
if (stream == null) {
acceptHeaderFrame();
if (idGenerator.isSameSide(streamId)) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
}
if (goAwayReceived ) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
}
updateLastStreamId(streamId);
final H2StreamChannelImpl channel = new H2StreamChannelImpl(
streamId, false, initInputWinSize, initOutputWinSize);
final H2StreamHandler streamHandler;
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
} else {
streamHandler = NoopH2StreamHandler.INSTANCE;
channel.setLocalEndStream();
}
stream = new H2Stream(channel, streamHandler, true);
if (stream.isOutputReady()) {
stream.produceOutput();
}
streamMap.put(streamId, stream);
}
try {
consumeHeaderFrame(frame, stream);
if (stream.isOutputReady()) {
stream.produceOutput();
}
} catch (final H2StreamResetException ex) {
stream.localReset(ex);
} catch (final HttpStreamResetException ex) {
stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
} catch (final HttpException ex) {
stream.handle(ex);
}
if (stream.isTerminated()) {
streamMap.remove(streamId);
stream.releaseResources();
requestSessionOutput();
}
}
break;
case CONTINUATION: {
if (continuation == null) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
}
if (streamId != continuation.streamId) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
}
final H2Stream stream = getValidStream(streamId);
try {
consumeContinuationFrame(frame, stream);
} catch (final H2StreamResetException ex) {
stream.localReset(ex);
} catch (final HttpStreamResetException ex) {
stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
}
if (stream.isTerminated()) {
streamMap.remove(streamId);
stream.releaseResources();
requestSessionOutput();
}
}
break;
case WINDOW_UPDATE: {
final ByteBuffer payload = frame.getPayload();
if (payload == null || payload.remaining() != 4) {
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
}
final int delta = payload.getInt();
if (delta <= 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
}
if (streamId == 0) {
try {
updateOutputWindow(0, connOutputWindow, delta);
} catch (final ArithmeticException ex) {
throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
}
} else {
final H2Stream stream = streamMap.get(streamId);
if (stream != null) {
try {
updateOutputWindow(streamId, stream.getOutputWindow(), delta);
} catch (final ArithmeticException ex) {
throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
}
}
}
ioSession.setEvent(SelectionKey.OP_WRITE);
}
break;
case RST_STREAM: {
if (streamId == 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
}
final H2Stream stream = streamMap.get(streamId);
if (stream == null) {
if (streamId > lastStreamId.get()) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
}
} else {
final ByteBuffer payload = frame.getPayload();
if (payload == null || payload.remaining() != 4) {
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
}
final int errorCode = payload.getInt();
stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
streamMap.remove(streamId);
stream.releaseResources();
requestSessionOutput();
}
}
break;
case PING: {
if (streamId != 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
}
final ByteBuffer ping = frame.getPayloadContent();
if (ping == null || ping.remaining() != 8) {
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
}
if (frame.isFlagSet(FrameFlag.ACK)) {
final AsyncPingHandler pingHandler = pingHandlers.poll();
if (pingHandler != null) {
pingHandler.consumeResponse(ping);
}
} else {
final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
pong.put(ping);
pong.flip();
final RawFrame response = frameFactory.createPingAck(pong);
commitFrame(response);
}
}
break;
case SETTINGS: {
if (streamId != 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
}
if (frame.isFlagSet(FrameFlag.ACK)) {
if (localSettingState == SettingsHandshake.TRANSMITTED) {
localSettingState = SettingsHandshake.ACKED;
ioSession.setEvent(SelectionKey.OP_WRITE);
applyLocalSettings();
}
} else {
final ByteBuffer payload = frame.getPayload();
if (payload != null) {
if ((payload.remaining() % 6) != 0) {
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
}
consumeSettingsFrame(payload);
remoteSettingState = SettingsHandshake.TRANSMITTED;
}
// Send ACK
final RawFrame response = frameFactory.createSettingsAck();
commitFrame(response);
remoteSettingState = SettingsHandshake.ACKED;
}
}
break;
case PRIORITY:
// Stream priority not supported
break;
case PUSH_PROMISE: {
acceptPushFrame();
if (goAwayReceived ) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
}
if (!localConfig.isPushEnabled()) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
}
final H2Stream stream = getValidStream(streamId);
if (stream.isRemoteClosed()) {
stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
break;
}
final ByteBuffer payload = frame.getPayloadContent();
if (payload == null || payload.remaining() < 4) {
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
}
final int promisedStreamId = payload.getInt();
if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
}
if (streamMap.get(promisedStreamId) != null) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
}
updateLastStreamId(promisedStreamId);
final H2StreamChannelImpl channel = new H2StreamChannelImpl(
promisedStreamId, false, initInputWinSize, initOutputWinSize);
final H2StreamHandler streamHandler;
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
stream.getPushHandlerFactory());
} else {
streamHandler = NoopH2StreamHandler.INSTANCE;
channel.setLocalEndStream();
}
final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
streamMap.put(promisedStreamId, promisedStream);
try {
consumePushPromiseFrame(frame, payload, promisedStream);
} catch (final H2StreamResetException ex) {
promisedStream.localReset(ex);
} catch (final HttpStreamResetException ex) {
promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
}
}
break;
case GOAWAY: {
if (streamId != 0) {
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
}
final ByteBuffer payload = frame.getPayload();
if (payload == null || payload.remaining() < 8) {
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
}
final int processedLocalStreamId = payload.getInt();
final int errorCode = payload.getInt();
goAwayReceived = true;
if (errorCode == H2Error.NO_ERROR.getCode()) {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<Integer, H2Stream> entry = it.next();
final int activeStreamId = entry.getKey();
if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
final H2Stream stream = entry.getValue();
stream.cancel();
it.remove();
}
}
}
connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
} else {
for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<Integer, H2Stream> entry = it.next();
final H2Stream stream = entry.getValue();
stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
}
streamMap.clear();
connState = ConnectionHandshake.SHUTDOWN;
}
}
ioSession.setEvent(SelectionKey.OP_WRITE);
break;
}
}