function ready()

in packages/dubbo-node/src/http2-session-manager.ts [510:711]


function ready(
  conn: http2.ClientHttp2Session,
  options: Required<Http2SessionOptions>
): StateReady {
  // the last time we were sure that the connection is alive, via a PING
  // response, or via received response bytes
  let lastAliveAt = Date.now();
  // how many streams are currently open on this session
  let streamCount = 0;
  // timer for the keep-alive interval
  let pingIntervalId: ReturnType<typeof setTimeout> | undefined;
  // timer for waiting for a PING response
  let pingTimeoutId: ReturnType<typeof setTimeout> | undefined;
  // keep track of GOAWAY with ENHANCE_YOUR_CALM and with debug data too_many_pings
  let receivedGoAwayEnhanceYourCalmTooManyPings = false;
  // timer for closing connections without open streams, must be initialized
  let idleTimeoutId: ReturnType<typeof setTimeout> | undefined;
  resetIdleTimeout();

  const state: StateReady = {
    t: "ready",
    conn,
    streamCount() {
      return streamCount;
    },
    requiresVerify(): boolean {
      const elapsedMs = Date.now() - lastAliveAt;
      return elapsedMs > options.pingIntervalMs;
    },
    onClose: undefined,
    onError: undefined,
    registerRequest(stream: http2.ClientHttp2Stream): void {
      streamCount++;
      if (streamCount == 1) {
        resetPingInterval(); // reset to ping with the appropriate interval for "open"
        stopIdleTimeout();
      }
      stream.once("response", () => {
        lastAliveAt = Date.now();
        resetPingInterval();
      });
      stream.once("close", () => {
        streamCount--;
        if (streamCount == 0) {
          resetPingInterval(); // reset to ping with the appropriate interval for "idle"
          resetIdleTimeout();
        }
      });
    },
    responseByteRead(stream: http2.ClientHttp2Stream) {
      if (stream.session !== conn) {
        return;
      }
      if (conn.closed || conn.destroyed) {
        return;
      }
      if (streamCount <= 0) {
        return;
      }
      lastAliveAt = Date.now();
      resetPingInterval();
    },
    ping() {
      return new Promise<boolean>((resolve) => {
        commonPing(() => resolve(true));
        conn.once("error", () => resolve(false));
      });
    },
    abort(reason) {
      if (!conn.destroyed) {
        conn.once("error", () => {
          // conn.destroy() may raise an error after onExitState() was called
          // and our error listeners are removed.
          // We attach this one to swallow uncaught exceptions.
        });
        conn.destroy(reason, http2.constants.NGHTTP2_CANCEL);
      }
    },
    onExitState() {
      cleanup();
      this.onError = undefined;
      this.onClose = undefined;
    },
  };

  // start or restart the ping interval
  function resetPingInterval() {
    stopPingInterval();
    if (streamCount > 0 || options.pingIdleConnection) {
      pingIntervalId = safeSetTimeout(onPingInterval, options.pingIntervalMs);
    }
  }

  function stopPingInterval() {
    clearTimeout(pingIntervalId);
    clearTimeout(pingTimeoutId);
  }

  function onPingInterval() {
    commonPing(resetPingInterval);
  }

  function commonPing(onSuccess: () => void) {
    clearTimeout(pingTimeoutId);
    pingTimeoutId = safeSetTimeout(() => {
      conn.destroy(
        new ConnectError("PING timed out", Code.Unavailable),
        http2.constants.NGHTTP2_CANCEL
      );
    }, options.pingTimeoutMs);
    conn.ping((err, duration) => {
      clearTimeout(pingTimeoutId);
      if (err !== null) {
        // We will receive an ERR_HTTP2_PING_CANCEL here if we destroy the
        // connection with a pending ping.
        // We might also see other errors, but they should be picked up by the
        // "error" event listener.
        return;
      }
      if (duration > options.pingTimeoutMs) {
        // setTimeout is not precise, and HTTP/2 pings take less than 1ms in
        // tests.
        conn.destroy(
          new ConnectError("PING timed out", Code.Unavailable),
          http2.constants.NGHTTP2_CANCEL
        );
        return;
      }
      lastAliveAt = Date.now();
      onSuccess();
    });
  }

  function stopIdleTimeout() {
    clearTimeout(idleTimeoutId);
  }

  function resetIdleTimeout() {
    idleTimeoutId = safeSetTimeout(
      onIdleTimeout,
      options.idleConnectionTimeoutMs
    );
  }

  function onIdleTimeout() {
    conn.close();
    onClose(); // trigger a state change right away so we are not open to races
  }

  function onGoaway(
    errorCode: number,
    lastStreamID: number,
    opaqueData: Buffer
  ) {
    const tooManyPingsAscii = Buffer.from("too_many_pings", "ascii");
    if (
      errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
      opaqueData.equals(tooManyPingsAscii)
    ) {
      // double pingIntervalMs, following the last paragraph of https://github.com/grpc/proposal/blob/0ba0c1905050525f9b0aee46f3f23c8e1e515489/A8-client-side-keepalive.md#basic-keepalive
      options.pingIntervalMs = options.pingIntervalMs * 2;
      receivedGoAwayEnhanceYourCalmTooManyPings = true;
    }
  }

  function onClose() {
    cleanup();
    state.onClose?.();
  }

  function onError(err: Error) {
    cleanup();
    if (receivedGoAwayEnhanceYourCalmTooManyPings) {
      // We cannot prevent node from destroying session and streams with its own
      // error that does not carry debug data, but at least we can wrap the error
      // we surface on the manager.
      const ce = new ConnectError(
        `http/2 connection closed with error code ENHANCE_YOUR_CALM (0x${http2.constants.NGHTTP2_ENHANCE_YOUR_CALM.toString(
          16
        )}), too_many_pings, doubled the interval`,
        Code.ResourceExhausted
      );
      state.onError?.(ce);
    } else {
      state.onError?.(connectErrorFromNodeReason(err));
    }
  }

  function cleanup() {
    stopPingInterval();
    stopIdleTimeout();
    conn.off("error", onError);
    conn.off("close", onClose);
    conn.off("goaway", onGoaway);
  }

  conn.on("error", onError);
  conn.on("close", onClose);
  conn.on("goaway", onGoaway);

  return state;
}