controlplane/src/core/bufservices/cache-warmer/getCacheWarmerOperations.ts (122 lines of code) (raw):
import { PlainMessage } from '@bufbuild/protobuf';
import { HandlerContext } from '@connectrpc/connect';
import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb';
import {
GetCacheWarmerOperationsRequest,
GetCacheWarmerOperationsResponse,
} from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb';
import { CacheWarmerRepository } from '../../../core/repositories/CacheWarmerRepository.js';
import { FederatedGraphRepository } from '../../../core/repositories/FederatedGraphRepository.js';
import { DefaultNamespace, NamespaceRepository } from '../../../core/repositories/NamespaceRepository.js';
import type { RouterOptions } from '../../routes.js';
import { enrichLogger, getLogger, handleError } from '../../util.js';
import { OrganizationRepository } from '../../../core/repositories/OrganizationRepository.js';
export function getCacheWarmerOperations(
opts: RouterOptions,
req: GetCacheWarmerOperationsRequest,
ctx: HandlerContext,
): Promise<PlainMessage<GetCacheWarmerOperationsResponse>> {
let logger = getLogger(ctx, opts.logger);
return handleError<PlainMessage<GetCacheWarmerOperationsResponse>>(ctx, logger, async () => {
const authContext = await opts.authenticator.authenticate(ctx.requestHeader);
logger = enrichLogger(ctx, logger, authContext);
req.namespace = req.namespace || DefaultNamespace;
const fedGraphRepo = new FederatedGraphRepository(logger, opts.db, authContext.organizationId);
const namespaceRepo = new NamespaceRepository(opts.db, authContext.organizationId);
const organizationRepo = new OrganizationRepository(logger, opts.db);
const cacheWarmerFeature = await organizationRepo.getFeature({
organizationId: authContext.organizationId,
featureId: 'cache-warmer',
});
if (!cacheWarmerFeature?.enabled) {
return {
response: {
code: EnumStatusCode.ERR_UPGRADE_PLAN,
details: `Upgrade to a enterprise plan to enable cache warmer`,
},
operations: [],
totalCount: 0,
isCacheWarmerEnabled: false,
};
}
const federatedGraph = await fedGraphRepo.byName(req.federatedGraphName, req.namespace, {
supportsFederation: true,
});
if (!federatedGraph) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: `Federated graph '${req.federatedGraphName}' not found`,
},
operations: [],
totalCount: 0,
isCacheWarmerEnabled: false,
};
}
const namespace = await namespaceRepo.byName(req.namespace);
if (!namespace) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: `Namespace '${req.namespace}' not found`,
},
operations: [],
totalCount: 0,
isCacheWarmerEnabled: false,
};
}
if (!namespace.enableCacheWarmer) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: `Cache Warmer is not enabled for namespace '${req.namespace}'`,
},
operations: [],
totalCount: 0,
isCacheWarmerEnabled: false,
};
}
if (!opts.chClient) {
return {
response: {
code: EnumStatusCode.ERR,
details: `ClickHouse client is not available`,
},
operations: [],
totalCount: 0,
isCacheWarmerEnabled: false,
};
}
const cacheWarmerRepo = new CacheWarmerRepository(opts.chClient, opts.db);
const operations = await cacheWarmerRepo.getCacheWarmerOperations({
organizationId: authContext.organizationId,
federatedGraphId: federatedGraph.id,
limit: req.limit,
offset: req.offset,
});
const operationsCount = await cacheWarmerRepo.getCacheWarmerOperationsCount({
organizationId: authContext.organizationId,
federatedGraphId: federatedGraph.id,
});
return {
response: {
code: EnumStatusCode.OK,
},
operations: operations.map((op) => ({
id: op.id,
operationContent: op.operationContent || '',
operationName: op.operationName || '',
operationPersistedId: op.operationPersistedID || '',
operationHash: op.operationHash || '',
clientName: op.clientName || '',
clientVersion: op.clientVersion || '',
planningTime: op.planningTime || 0,
isManuallyAdded: op.isManuallyAdded,
createdAt: op.createdAt.toISOString() || '',
createdBy: op.createdBy || '',
})),
totalCount: operationsCount,
isCacheWarmerEnabled: namespace.enableCacheWarmer,
};
});
}