in packages/dubbo-node/src/http2-session-manager.ts [510:711]
function ready(
conn: http2.ClientHttp2Session,
options: Required<Http2SessionOptions>
): StateReady {
// the last time we were sure that the connection is alive, via a PING
// response, or via received response bytes
let lastAliveAt = Date.now();
// how many streams are currently open on this session
let streamCount = 0;
// timer for the keep-alive interval
let pingIntervalId: ReturnType<typeof setTimeout> | undefined;
// timer for waiting for a PING response
let pingTimeoutId: ReturnType<typeof setTimeout> | undefined;
// keep track of GOAWAY with ENHANCE_YOUR_CALM and with debug data too_many_pings
let receivedGoAwayEnhanceYourCalmTooManyPings = false;
// timer for closing connections without open streams, must be initialized
let idleTimeoutId: ReturnType<typeof setTimeout> | undefined;
resetIdleTimeout();
const state: StateReady = {
t: "ready",
conn,
streamCount() {
return streamCount;
},
requiresVerify(): boolean {
const elapsedMs = Date.now() - lastAliveAt;
return elapsedMs > options.pingIntervalMs;
},
onClose: undefined,
onError: undefined,
registerRequest(stream: http2.ClientHttp2Stream): void {
streamCount++;
if (streamCount == 1) {
resetPingInterval(); // reset to ping with the appropriate interval for "open"
stopIdleTimeout();
}
stream.once("response", () => {
lastAliveAt = Date.now();
resetPingInterval();
});
stream.once("close", () => {
streamCount--;
if (streamCount == 0) {
resetPingInterval(); // reset to ping with the appropriate interval for "idle"
resetIdleTimeout();
}
});
},
responseByteRead(stream: http2.ClientHttp2Stream) {
if (stream.session !== conn) {
return;
}
if (conn.closed || conn.destroyed) {
return;
}
if (streamCount <= 0) {
return;
}
lastAliveAt = Date.now();
resetPingInterval();
},
ping() {
return new Promise<boolean>((resolve) => {
commonPing(() => resolve(true));
conn.once("error", () => resolve(false));
});
},
abort(reason) {
if (!conn.destroyed) {
conn.once("error", () => {
// conn.destroy() may raise an error after onExitState() was called
// and our error listeners are removed.
// We attach this one to swallow uncaught exceptions.
});
conn.destroy(reason, http2.constants.NGHTTP2_CANCEL);
}
},
onExitState() {
cleanup();
this.onError = undefined;
this.onClose = undefined;
},
};
// start or restart the ping interval
function resetPingInterval() {
stopPingInterval();
if (streamCount > 0 || options.pingIdleConnection) {
pingIntervalId = safeSetTimeout(onPingInterval, options.pingIntervalMs);
}
}
function stopPingInterval() {
clearTimeout(pingIntervalId);
clearTimeout(pingTimeoutId);
}
function onPingInterval() {
commonPing(resetPingInterval);
}
function commonPing(onSuccess: () => void) {
clearTimeout(pingTimeoutId);
pingTimeoutId = safeSetTimeout(() => {
conn.destroy(
new ConnectError("PING timed out", Code.Unavailable),
http2.constants.NGHTTP2_CANCEL
);
}, options.pingTimeoutMs);
conn.ping((err, duration) => {
clearTimeout(pingTimeoutId);
if (err !== null) {
// We will receive an ERR_HTTP2_PING_CANCEL here if we destroy the
// connection with a pending ping.
// We might also see other errors, but they should be picked up by the
// "error" event listener.
return;
}
if (duration > options.pingTimeoutMs) {
// setTimeout is not precise, and HTTP/2 pings take less than 1ms in
// tests.
conn.destroy(
new ConnectError("PING timed out", Code.Unavailable),
http2.constants.NGHTTP2_CANCEL
);
return;
}
lastAliveAt = Date.now();
onSuccess();
});
}
function stopIdleTimeout() {
clearTimeout(idleTimeoutId);
}
function resetIdleTimeout() {
idleTimeoutId = safeSetTimeout(
onIdleTimeout,
options.idleConnectionTimeoutMs
);
}
function onIdleTimeout() {
conn.close();
onClose(); // trigger a state change right away so we are not open to races
}
function onGoaway(
errorCode: number,
lastStreamID: number,
opaqueData: Buffer
) {
const tooManyPingsAscii = Buffer.from("too_many_pings", "ascii");
if (
errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
opaqueData.equals(tooManyPingsAscii)
) {
// double pingIntervalMs, following the last paragraph of https://github.com/grpc/proposal/blob/0ba0c1905050525f9b0aee46f3f23c8e1e515489/A8-client-side-keepalive.md#basic-keepalive
options.pingIntervalMs = options.pingIntervalMs * 2;
receivedGoAwayEnhanceYourCalmTooManyPings = true;
}
}
function onClose() {
cleanup();
state.onClose?.();
}
function onError(err: Error) {
cleanup();
if (receivedGoAwayEnhanceYourCalmTooManyPings) {
// We cannot prevent node from destroying session and streams with its own
// error that does not carry debug data, but at least we can wrap the error
// we surface on the manager.
const ce = new ConnectError(
`http/2 connection closed with error code ENHANCE_YOUR_CALM (0x${http2.constants.NGHTTP2_ENHANCE_YOUR_CALM.toString(
16
)}), too_many_pings, doubled the interval`,
Code.ResourceExhausted
);
state.onError?.(ce);
} else {
state.onError?.(connectErrorFromNodeReason(err));
}
}
function cleanup() {
stopPingInterval();
stopIdleTimeout();
conn.off("error", onError);
conn.off("close", onClose);
conn.off("goaway", onGoaway);
}
conn.on("error", onError);
conn.on("close", onClose);
conn.on("goaway", onGoaway);
return state;
}