packages/sdk/src/FlagResolverClient.ts (317 lines of code) (raw):

import { FlagEvaluation } from '.'; import { AccessiblePromise } from './AccessiblePromise'; import { Applier, FlagResolution } from './FlagResolution'; import { Telemetry, TraceConsumer } from './Telemetry'; import { CacheProvider } from './flag-cache'; import { Context } from './context'; import { FetchBuilder, InternalFetch, SimpleFetch, TimeUnit } from './fetch-util'; import { ResolveFlagsRequest, ResolveFlagsResponse, ApplyFlagsRequest, AppliedFlag, } from './generated/confidence/flags/resolver/v1/api'; import { Sdk } from './generated/confidence/flags/resolver/v1/types'; import { LibraryTraces_Library, LibraryTraces_Trace_RequestTrace_Status as TraceStatus, LibraryTraces_TraceId, Monitoring, } from './generated/confidence/telemetry/v1/telemetry'; import { Logger } from './logger'; import { WaitUntil } from './types'; const FLAG_PREFIX = 'flags/'; const retryCodes = new Set([408, 502, 503, 504]); export class ResolveError extends Error { constructor(public readonly code: FlagEvaluation.ErrorCode, message: string) { super(message); } } export class PendingResolution<T = FlagResolution> extends AccessiblePromise<T> { #context: Context; #controller: AbortController; get signal(): AbortSignal { return this.#controller.signal; } protected constructor(promise: PromiseLike<T>, context: Context, controller: AbortController, rejected?: boolean) { super(promise, rejected); this.#context = context; this.#controller = controller; } protected chain<S>(value: any, rejected?: boolean | undefined): PendingResolution<S> { return new PendingResolution(value, this.#context, this.#controller, rejected); } get context(): Context { return this.#context; } abort(): void { this.#controller.abort(); } then<TResult1 = T, TResult2 = never>( onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | null | undefined, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | null | undefined, ): PendingResolution<TResult1 | TResult2> { return super.then(onfulfilled, onrejected) as PendingResolution<TResult1 | TResult2>; } catch<TResult = never>( onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null | undefined, ): PendingResolution<T | TResult> { return super.catch(onrejected) as PendingResolution<T | TResult>; } finally(onfinally?: (() => void) | null | undefined): PendingResolution<T> { return super.finally(onfinally) as PendingResolution<T>; } static create( context: Context, executor: (signal: AbortSignal) => PromiseLike<FlagResolution>, ): PendingResolution<FlagResolution> { const controller = new AbortController(); return new PendingResolution(executor(controller.signal), context, controller); } } export interface FlagResolverClient { resolve(context: Context, flags: string[]): PendingResolution; } export type FlagResolverClientOptions = { fetchImplementation: SimpleFetch; clientSecret: string; sdk: Sdk; applyDebounce: number; resolveTimeout: number; environment: 'client' | 'backend'; region?: 'eu' | 'us'; resolveBaseUrl?: string; telemetry: Telemetry; logger: Logger; waitUntil?: WaitUntil; cacheProvider?: CacheProvider; }; export class FetchingFlagResolverClient implements FlagResolverClient { private readonly fetchImplementation: InternalFetch; private readonly clientSecret: string; private readonly sdk: Sdk; private readonly applyDebounce: number; private readonly resolveTimeout: number; private readonly baseUrl: string; private readonly traceConsumer: TraceConsumer; private readonly waitUntil: WaitUntil | undefined; private readonly cacheReadThrough: ( context: Context, supplier: () => Promise<ResolveFlagsResponse>, ) => Promise<{ response: ResolveFlagsResponse; isFromCache: boolean }>; constructor({ fetchImplementation, clientSecret, sdk, applyDebounce, resolveTimeout, // todo refactor to move out environment environment, region, resolveBaseUrl, telemetry, logger, waitUntil, cacheProvider, }: FlagResolverClientOptions) { this.traceConsumer = telemetry.registerLibraryTraces({ library: LibraryTraces_Library.LIBRARY_CONFIDENCE, version: sdk.version, id: LibraryTraces_TraceId.TRACE_ID_RESOLVE_LATENCY, }); const fetchBuilder = new FetchBuilder(); withTelemetryData(fetchBuilder, telemetry); if (environment === 'client') { withRequestLogic(fetchBuilder, logger); } this.fetchImplementation = fetchBuilder.build(fetchImplementation); this.clientSecret = clientSecret; this.sdk = sdk; this.applyDebounce = applyDebounce; if (resolveBaseUrl) { this.baseUrl = `${resolveBaseUrl}/v1`; } else { this.baseUrl = region ? `https://resolver.${region}.confidence.dev/v1` : 'https://resolver.confidence.dev/v1'; } this.resolveTimeout = resolveTimeout; this.waitUntil = waitUntil; if (cacheProvider) { this.cacheReadThrough = (context, supplier) => { const cache = cacheProvider(this.clientSecret); let isFromCache = true; // Default to true, will be set to false if supplier is called // Create a wrapper supplier that sets the flag when called const wrappedSupplier = async () => { isFromCache = false; return supplier(); }; return cache.get(context, wrappedSupplier).then(response => { return { response, isFromCache }; }); }; } else { this.cacheReadThrough = (_context, supplier) => supplier().then(response => ({ response, isFromCache: false })); } } private markLatency(latency: number, status: TraceStatus): void { this.traceConsumer({ requestTrace: { millisecondDuration: Math.round(latency), status, }, }); } resolve(context: Context): PendingResolution { const request: ResolveFlagsRequest = { clientSecret: this.clientSecret, evaluationContext: context, apply: false, sdk: this.sdk, flags: [], }; return PendingResolution.create(context, signal => { const signalWithTimeout = withTimeout( signal, this.resolveTimeout, new ResolveError('TIMEOUT', 'Resolve timeout'), ); const start = performance.now(); return this.cacheReadThrough(context, () => this.resolveFlagsJson(request, signalWithTimeout)) .then(({ response, isFromCache }) => { const latency = performance.now() - start; this.markLatency(latency, isFromCache ? TraceStatus.STATUS_CACHED : TraceStatus.STATUS_SUCCESS); return FlagResolution.ready(context, response, this.createApplier(response.resolveToken)); }) .catch(error => { const latency = performance.now() - start; const errorCode: FlagEvaluation.ErrorCode = error instanceof ResolveError ? error.code : 'GENERAL'; this.markLatency(latency, errorCode === 'TIMEOUT' ? TraceStatus.STATUS_TIMEOUT : TraceStatus.STATUS_ERROR); return FlagResolution.failed(context, errorCode, error.message); }); }); } createApplier(resolveToken: Uint8Array): Applier { const applied = new Set<string>(); const pending: AppliedFlag[] = []; let [nextFlush, resolveNextFlush] = resolvablePromise(); const flush = () => { const resolveCurrentFlush = resolveNextFlush; [nextFlush, resolveNextFlush] = resolvablePromise(); timeoutId = 0; this.apply({ flags: pending.splice(0, pending.length), clientSecret: this.clientSecret, resolveToken, sdk: this.sdk, sendTime: new Date(), }).finally(resolveCurrentFlush); }; let timeoutId = 0; return (flagName: string) => { if (applied.has(flagName)) return; this.waitUntil?.(nextFlush); applied.add(flagName); pending.push({ flag: FLAG_PREFIX + flagName, applyTime: new Date(), }); if (timeoutId) { clearTimeout(timeoutId); } if (this.applyDebounce === 0) { flush(); } else { timeoutId = Number(setTimeout(flush, this.applyDebounce)); } }; } async apply(request: ApplyFlagsRequest): Promise<void> { const resp = await this.fetchImplementation(`${this.baseUrl}/flags:apply`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(ApplyFlagsRequest.toJSON(request)), }); if (!resp.ok) { throw new Error(`${resp.status}: ${resp.statusText}`); } } async resolveFlagsJson(request: ResolveFlagsRequest, signal: AbortSignal): Promise<ResolveFlagsResponse> { const resp = await this.fetchImplementation(`${this.baseUrl}/flags:resolve`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(ResolveFlagsRequest.toJSON(request)), signal, }); if (!resp.ok) { throw new Error(`${resp.status}: ${resp.statusText}`); } return ResolveFlagsResponse.fromJSON(await resp.json()); } // async resolveFlagsProto(request: ResolveFlagsRequest): Promise<ResolveFlagsResponse> { // const resp = await this.fetchImplementation( // new Request('https://resolver.confidence.dev/v1/flags:resolve', { // method: 'POST', // headers: { // 'Content-Type': 'application/x-protobuf', // }, // body: ResolveFlagsRequest.encode(request).finish(), // }), // ); // if (!resp.ok) { // throw new Error(`${resp.status}: ${resp.statusText}`); // } // return ResolveFlagsResponse.decode(new Uint8Array(await resp.arrayBuffer())); // } } export function withTelemetryData(fetchBuilder: FetchBuilder, telemetry: Telemetry) { fetchBuilder.modifyRequest(({ headers }) => { const monitoring = telemetry.getSnapshot(); if (monitoring.libraryTraces.length > 0) { const base64Message = btoa(String.fromCharCode(...Monitoring.encode(monitoring).finish())); return { headers: { ...headers, ['X-CONFIDENCE-TELEMETRY']: base64Message } }; } return {}; }); } export function withRequestLogic(fetchBuilder: FetchBuilder, logger: Logger) { const fetchResolve = new FetchBuilder() // infinite retries without delay until aborted by timeout .compose(next => async request => { try { const response = await next(request); return response; } catch (error) { logger.error?.(`Confidence: ${error}`); throw error; } }) .rejectNotOk() .retry() .rejectOn(response => retryCodes.has(response.status)) .rateLimit(1, { initialTokens: 3, maxTokens: 2 }); const fetchApply = new FetchBuilder() .limitPending(1000) .timeout(30 * TimeUnit.MINUTE) .rejectNotOk() .retry({ delay: 5 * TimeUnit.SECOND, backoff: 2, maxDelay: 5 * TimeUnit.MINUTE, jitter: 0.2 }) .rejectOn(response => retryCodes.has(response.status)) .rateLimit(2) // update send-time before sending .modifyRequest(({ method, body }) => { if (method === 'POST' && body) { body = JSON.stringify({ ...JSON.parse(body), sendTime: new Date().toISOString() }); return { body }; } return {}; }); fetchBuilder .route(url => url.endsWith('flags:resolve'), fetchResolve) .route(url => url.endsWith('flags:apply'), fetchApply); } function withTimeout(signal: AbortSignal, timeout: number, reason?: any): AbortSignal { const controller = new AbortController(); const timeoutId: NodeJS.Timeout | number = setTimeout(() => controller.abort(reason), timeout); // in Node setTimeout returns an object, with an unref function which will prevent the timeout from keeping the process alive if (typeof timeoutId === 'object') timeoutId.unref(); signal.addEventListener('abort', () => { clearTimeout(timeoutId); controller.abort(signal.reason); }); return controller.signal; } function resolvablePromise<T = void>(): [ promise: Promise<T>, resolve: (value: T) => void, reject: (reason: any) => void, ] { let resolve: (value: T) => void; let reject: (reason: any) => void; const promise = new Promise<T>((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); return [promise, resolve!, reject!]; }