controlplane/src/core/build-server.ts (444 lines of code) (raw):

import Fastify, { FastifyBaseLogger } from 'fastify'; import { S3Client } from '@aws-sdk/client-s3'; import { fastifyConnectPlugin } from '@connectrpc/connect-fastify'; import { cors, createContextValues } from '@connectrpc/connect'; import fastifyCors from '@fastify/cors'; import { pino, stdTimeFunctions, LoggerOptions } from 'pino'; import { compressionBrotli, compressionGzip } from '@connectrpc/connect-node'; import fastifyGracefulShutdown from 'fastify-graceful-shutdown'; import { App } from 'octokit'; import { Worker } from 'bullmq'; import routes from './routes.js'; import fastifyHealth from './plugins/health.js'; import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js'; import fastifyDatabase from './plugins/database.js'; import fastifyClickHouse from './plugins/clickhouse.js'; import fastifyRedis from './plugins/redis.js'; import AuthController from './controllers/auth.js'; import ScimController from './controllers/scim.js'; import GitHubWebhookController from './controllers/github.js'; import StripeWebhookController from './controllers/stripe.js'; import { pkceCodeVerifierCookieName, userSessionCookieName } from './crypto/jwt.js'; import ApiKeyAuthenticator from './services/ApiKeyAuthenticator.js'; import WebSessionAuthenticator from './services/WebSessionAuthenticator.js'; import { Authentication } from './services/Authentication.js'; import { OrganizationRepository } from './repositories/OrganizationRepository.js'; import GraphApiTokenAuthenticator from './services/GraphApiTokenAuthenticator.js'; import AuthUtils from './auth-utils.js'; import Keycloak from './services/Keycloak.js'; import { PlatformWebhookService } from './webhooks/PlatformWebhookService.js'; import AccessTokenAuthenticator from './services/AccessTokenAuthenticator.js'; import { GitHubRepository } from './repositories/GitHubRepository.js'; import { S3BlobStorage } from './blobstorage/index.js'; import Mailer from './services/Mailer.js'; import { OrganizationInvitationRepository } from './repositories/OrganizationInvitationRepository.js'; import { Authorization } from './services/Authorization.js'; import { BillingRepository } from './repositories/BillingRepository.js'; import { BillingService } from './services/BillingService.js'; import { UserRepository } from './repositories/UserRepository.js'; import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js'; import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName } from './util.js'; import { ApiKeyRepository } from './repositories/ApiKeyRepository.js'; import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; import { createDeactivateOrganizationWorker, DeactivateOrganizationQueue, } from './workers/DeactivateOrganizationWorker.js'; import { createDeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js'; import { createReactivateOrganizationWorker, ReactivateOrganizationQueue, } from './workers/ReactivateOrganizationWorker.js'; export interface BuildConfig { logger: LoggerOptions; database: { url: string; tls?: { cert?: string; // e.g. string or '/path/to/my/client-cert.pem' ca?: string; // e.g. string or '/path/to/my/server-ca.pem' key?: string; // e.g. string or '/path/to/my/client-key.pem' }; }; prometheus?: MetricsOptions; openaiAPIKey?: string; allowedOrigins?: string[]; debugSQL?: boolean; production?: boolean; clickhouseDsn?: string; keycloak: { loginRealm: string; realm: string; clientId: string; adminUser: string; adminPassword: string; apiUrl: string; frontendUrl: string; }; auth: { webBaseUrl: string; secureCookie?: boolean; webErrorPath: string; secret: string; redirectUri: string; }; webhook?: { url?: string; key?: string; }; githubApp?: { webhookSecret?: string; clientId?: string; clientSecret?: string; id?: string; privateKey?: string; }; slack: { clientID?: string; clientSecret?: string }; cdnBaseUrl: string; s3Storage: { url: string; endpoint?: string; region?: string; username?: string; password?: string; forcePathStyle?: boolean; }; mailer: { smtpEnabled: boolean; smtpHost?: string; smtpPort?: number; smtpUsername?: string; smtpPassword?: string; smtpSecure: boolean; smtpRequireTls: boolean; }; admissionWebhook: { secret: string; }; stripe?: { secret?: string; webhookSecret?: string; defaultPlanId?: string; }; redis: { host: string; port: number; password?: string; tls?: { cert?: string; // e.g. string or '/path/to/my/client-cert.pem' ca?: string; // e.g. string or '/path/to/my/server-ca.pem' key?: string; // e.g. string or '/path/to/my/client-key.pem' }; }; } export interface MetricsOptions { enabled?: boolean; path?: string; host?: string; port?: number; } const developmentLoggerOpts: LoggerOptions = { transport: { target: 'pino-pretty', options: { singleLine: true, translateTime: 'HH:MM:ss Z', ignore: 'pid,hostname', }, }, }; export default async function build(opts: BuildConfig) { opts.logger = { timestamp: stdTimeFunctions.isoTime, formatters: { level: (label) => { return { level: label, }; }, }, ...opts.logger, }; const logger = pino(opts.production ? opts.logger : { ...developmentLoggerOpts, ...opts.logger }); const fastify = Fastify({ logger, // The maximum amount of time in *milliseconds* in which a plugin can load pluginTimeout: 10_000, // 10s }); /** * Plugin registration */ await fastify.register(fastifyHealth); if (opts.prometheus?.enabled) { await fastify.register(fastifyMetrics, { path: opts.prometheus.path, }); await fastify.metricsServer.listen({ host: opts.prometheus.host, port: opts.prometheus.port, }); } await fastify.register(fastifyDatabase, { databaseConnectionUrl: opts.database.url, gracefulTimeoutSec: 15, tls: opts.database.tls, debugSQL: opts.debugSQL, }); await fastify.register(fastifyCors, { // Produce an error if allowedOrigins is undefined origin: opts.allowedOrigins || [], methods: [...cors.allowedMethods], allowedHeaders: [...cors.allowedHeaders, 'cosmo-org-slug', 'user-agent'], exposedHeaders: [...cors.exposedHeaders, 'Trailer-Response-Id'], credentials: true, // Let browsers cache CORS information to reduce the number of // preflight requests. Modern Chrome caps the value at 2h. maxAge: 2 * 60 * 60, }); if (opts.clickhouseDsn) { await fastify.register(fastifyClickHouse, { dsn: opts.clickhouseDsn, logger, }); } else { logger.warn('ClickHouse connection not configured'); } const authUtils = new AuthUtils(fastify.db, { jwtSecret: opts.auth.secret, session: { cookieName: userSessionCookieName, }, oauth: { clientID: opts.keycloak.clientId, redirectUri: opts.auth.redirectUri, openIdApiBaseUrl: opts.keycloak.apiUrl + '/realms/' + opts.keycloak.realm, openIdFrontendUrl: opts.keycloak.frontendUrl + '/realms/' + opts.keycloak.realm, logoutRedirectUri: opts.auth.webBaseUrl + '/login', }, pkce: { cookieName: pkceCodeVerifierCookieName, }, webBaseUrl: opts.auth.webBaseUrl, webErrorPath: opts.auth.webErrorPath, }); const organizationRepository = new OrganizationRepository(logger, fastify.db, opts.stripe?.defaultPlanId); const orgInvitationRepository = new OrganizationInvitationRepository(logger, fastify.db, opts.stripe?.defaultPlanId); const apiKeyAuth = new ApiKeyAuthenticator(fastify.db, organizationRepository); const userRepo = new UserRepository(logger, fastify.db); const apiKeyRepository = new ApiKeyRepository(fastify.db); const webAuth = new WebSessionAuthenticator(opts.auth.secret, userRepo); const graphKeyAuth = new GraphApiTokenAuthenticator(opts.auth.secret); const accessTokenAuth = new AccessTokenAuthenticator(organizationRepository, authUtils); const authenticator = new Authentication(webAuth, apiKeyAuth, accessTokenAuth, graphKeyAuth, organizationRepository); const authorizer = new Authorization(logger, opts.stripe?.defaultPlanId); const keycloakClient = new Keycloak({ apiUrl: opts.keycloak.apiUrl, realm: opts.keycloak.loginRealm, clientId: opts.keycloak.clientId, adminUser: opts.keycloak.adminUser, adminPassword: opts.keycloak.adminPassword, }); let mailerClient: Mailer | undefined; if (opts.mailer.smtpEnabled) { const { smtpHost, smtpPort, smtpSecure, smtpRequireTls, smtpUsername, smtpPassword } = opts.mailer; const isSmtpHostSet = smtpHost && smtpPort; const isSmtpAuthSet = smtpUsername && smtpPassword; if (!isSmtpHostSet) { throw new Error(`smtp host or port not set properly! Please ensure to do so!`); } if (!isSmtpAuthSet) { throw new Error(`smtp username and host not set properly!`); } mailerClient = new Mailer({ smtpHost, smtpPort, smtpSecure, smtpRequireTls, smtpUsername, smtpPassword, }); try { const verified = await mailerClient.verifyConnection(); if (verified) { logger.info('Email client ready to send emails'); } else { logger.error('Email client failed to verify connection'); } } catch (error) { logger.error(error, 'Email client could not verify connection'); } } const bullWorkers: Worker[] = []; await fastify.register(fastifyRedis, { host: opts.redis.host, port: opts.redis.port, password: opts.redis.password, tls: opts.redis.tls, }); if (!opts.s3Storage || !opts.s3Storage.url) { throw new Error('S3 storage URL is required'); } const bucketName = extractS3BucketName(opts.s3Storage); const s3Config = createS3ClientConfig(bucketName, opts.s3Storage); const s3Client = new S3Client(s3Config); const blobStorage = new S3BlobStorage(s3Client, bucketName); const platformWebhooks = new PlatformWebhookService(opts.webhook?.url, opts.webhook?.key, logger); const readmeQueue = new AIGraphReadmeQueue(logger, fastify.redisForQueue); if (opts.openaiAPIKey) { bullWorkers.push( createAIGraphReadmeWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, openAiApiKey: opts.openaiAPIKey, }), ); } const deleteOrganizationQueue = new DeleteOrganizationQueue(logger, fastify.redisForQueue); bullWorkers.push( createDeleteOrganizationWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, keycloakClient, keycloakRealm: opts.keycloak.realm, blobStorage, }), ); const deactivateOrganizationQueue = new DeactivateOrganizationQueue(logger, fastify.redisForQueue); bullWorkers.push( createDeactivateOrganizationWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, keycloakClient, keycloakRealm: opts.keycloak.realm, deleteOrganizationQueue, }), ); const reactivateOrganizationQueue = new ReactivateOrganizationQueue(logger, fastify.redisForQueue); bullWorkers.push( createReactivateOrganizationWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, deleteOrganizationQueue, }), ); const deleteUserQueue = new DeleteUserQueue(logger, fastify.redisForQueue); bullWorkers.push( createDeleteUserWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, keycloakClient, keycloakRealm: opts.keycloak.realm, blobStorage, platformWebhooks, }), ); // required to verify webhook payloads await fastify.register(import('fastify-raw-body'), { field: 'rawBody', global: false, encoding: 'utf8', }); let githubApp: App | undefined; if (opts.githubApp?.clientId) { githubApp = new App({ appId: opts.githubApp?.id ?? '', privateKey: Buffer.from(opts.githubApp?.privateKey ?? '', 'base64').toString(), oauth: { clientId: opts.githubApp?.clientId ?? '', clientSecret: opts.githubApp?.clientSecret ?? '', }, }); const githubRepository = new GitHubRepository(fastify.db, githubApp); await fastify.register(GitHubWebhookController, { prefix: '/webhook/github', githubRepository, webhookSecret: opts.githubApp?.webhookSecret ?? '', logger, }); } if (opts.stripe?.secret && opts.stripe?.webhookSecret) { const billingRepo = new BillingRepository(fastify.db); const billingService = new BillingService(fastify.db, billingRepo); await fastify.register(StripeWebhookController, { prefix: '/webhook/stripe', billingService, webhookSecret: opts.stripe.webhookSecret, logger, }); } /** * Controllers registration */ await fastify.register(AuthController, { organizationRepository, orgInvitationRepository, webAuth, authUtils, prefix: '/v1/auth', db: fastify.db, jwtSecret: opts.auth.secret, session: { cookieName: userSessionCookieName, }, pkce: { cookieName: pkceCodeVerifierCookieName, }, webBaseUrl: opts.auth.webBaseUrl, keycloakClient, keycloakRealm: opts.keycloak.realm, platformWebhooks, defaultBillingPlanId: opts.stripe?.defaultPlanId, }); await fastify.register(ScimController, { organizationRepository, userRepository: userRepo, apiKeyRepository, authenticator: apiKeyAuth, prefix: '/scim/v2', db: fastify.db, keycloakClient, keycloakRealm: opts.keycloak.realm, }); // Must be registered after custom fastify routes // Because it registers an all-catch route for connect handlers await fastify.register(fastifyConnectPlugin, { routes: routes({ db: fastify.db, logger, jwtSecret: opts.auth.secret, keycloakRealm: opts.keycloak.realm, keycloakApiUrl: opts.keycloak.apiUrl, chClient: fastify.ch, authenticator, authorizer, keycloakClient, platformWebhooks, githubApp, webBaseUrl: opts.auth.webBaseUrl, slack: opts.slack, blobStorage, mailerClient, billingDefaultPlanId: opts.stripe?.defaultPlanId, openaiApiKey: opts.openaiAPIKey, queues: { readmeQueue, deleteOrganizationQueue, deactivateOrganizationQueue, reactivateOrganizationQueue, deleteUserQueue, }, stripeSecretKey: opts.stripe?.secret, admissionWebhookJWTSecret: opts.admissionWebhook.secret, cdnBaseUrl: opts.cdnBaseUrl, }), contextValues(req) { return createContextValues().set<FastifyBaseLogger>({ id: fastifyLoggerId, defaultValue: req.log }, req.log); }, logLevel: opts.logger.level as pino.LevelWithSilent, // Avoid compression for small requests compressMinBytes: 1024, maxTimeoutMs: 80_000, shutdownTimeoutMs: 30_000, // The default limit is the maximum supported value of ~4GiB // We go with 32MiB to avoid allocating too much memory for large requests writeMaxBytes: 32 * 1024 * 1024, acceptCompression: [compressionBrotli, compressionGzip], }); await fastify.register(fastifyGracefulShutdown, { timeout: 60_000, }); fastify.gracefulShutdown(async () => { fastify.log.debug('Shutting down bull workers'); await Promise.all(bullWorkers.map((worker) => worker.close())); fastify.log.debug('Bull workers shut down'); }); return fastify; }