public async checkMultipleSchemas()

in controlplane/src/core/repositories/SchemaCheckRepository.ts [594:1077]


  public async checkMultipleSchemas({
    organizationId,
    subgraphs,
    namespace,
    orgRepo,
    subgraphRepo,
    fedGraphRepo,
    schemaLintRepo,
    schemaGraphPruningRepo,
    proposalRepo,
    composer,
    trafficInspector,
    logger,
    vcsContext,
    chClient,
    skipProposalMatchCheck,
  }: {
    organizationId: string;
    orgRepo: OrganizationRepository;
    subgraphRepo: SubgraphRepository;
    fedGraphRepo: FederatedGraphRepository;
    schemaLintRepo: SchemaLintRepository;
    schemaGraphPruningRepo: SchemaGraphPruningRepository;
    proposalRepo: ProposalRepository;
    trafficInspector: SchemaUsageTrafficInspector;
    composer: Composer;
    // proposal subgraphs do not contain feature subgraphs
    subgraphs: ProposalSubgraph[];
    namespace: NamespaceDTO;
    logger: FastifyBaseLogger;
    vcsContext?: VCSContext;
    chClient?: ClickHouseClient;
    skipProposalMatchCheck: boolean;
  }) {
    const breakingChanges: SchemaChange[] = [];
    const nonBreakingChanges: SchemaChange[] = [];
    const lintWarnings: LintIssue[] = [];
    const lintErrors: LintIssue[] = [];
    const graphPruneWarnings: GraphPruningIssue[] = [];
    const graphPruneErrors: GraphPruningIssue[] = [];
    const inspectedOperations: InspectorOperationResult[] = [];
    const compositionErrors: PlainMessage<CompositionError>[] = [];
    const compositionWarnings: PlainMessage<CompositionWarning>[] = [];

    const federatedGraphs: FederatedGraphDTO[] = [];
    const checkSubgraphs: Map<string, CheckSubgraph> = new Map();

    const changeRetention = await orgRepo.getFeature({
      organizationId,
      featureId: 'breaking-change-retention',
    });

    const limit = changeRetention?.limit ?? 7;

    const schemaCheckID = await this.create({
      proposedSubgraphSchemaSDL: '',
      lintSkipped: !namespace.enableLinting,
      graphPruningSkipped: !namespace.enableGraphPruning,
      vcsContext,
    });

    for (const s of subgraphs) {
      const subgraph = await subgraphRepo.byName(s.name, namespace.name);
      const newSchemaSDL = s.isDeleted ? '' : s.schemaSDL;
      const routerCompatibilityVersion = getFederatedGraphRouterCompatibilityVersion(federatedGraphs);

      const graphs = await fedGraphRepo.bySubgraphLabels({
        labels: subgraph?.labels || s.labels,
        namespaceId: namespace.id,
        excludeContracts: true,
      });

      const schemaCheckSubgraphId = await this.createSchemaCheckSubgraph({
        data: {
          schemaCheckId: schemaCheckID,
          subgraphId: subgraph?.id,
          subgraphName: s.name,
          proposedSubgraphSchemaSDL: newSchemaSDL,
          isDeleted: newSchemaSDL === '',
          isNew: !subgraph,
          namespaceId: namespace.id,
          labels: subgraph ? undefined : s.labels,
        },
      });

      for (const graph of graphs) {
        // if the check federated graph already exists, we don't need to create a new one
        const checkFederatedGraphId = await this.createCheckedFederatedGraph(schemaCheckID, graph.id, limit);
        await this.createSchemaCheckSubgraphFederatedGraphs({
          schemaCheckFederatedGraphId: checkFederatedGraphId,
          checkSubgraphIds: [schemaCheckSubgraphId],
        });
      }

      federatedGraphs.push(...graphs.filter((g) => !federatedGraphs.some((fg) => fg.id === g.id)));

      let newGraphQLSchema: GraphQLSchema | undefined;
      if (newSchemaSDL) {
        try {
          // Here we check if the schema is valid as a subgraph SDL
          const result = buildSchema(newSchemaSDL, true, routerCompatibilityVersion);
          if (!result.success) {
            await this.update({
              schemaCheckID,
              compositionSkipped: true,
              breakingChangesSkipped: true,
              trafficCheckSkipped: true,
              graphPruningSkipped: true,
              lintSkipped: true,
              errorMessage: `Invalid schema of subgraph '${s.name}'`,
            });

            return {
              response: {
                code: EnumStatusCode.ERR_INVALID_SUBGRAPH_SCHEMA,
                details: result.errors.map((e) => e.toString()).join('\n'),
              },
              breakingChanges: [],
              nonBreakingChanges: [],
              compositionErrors: [],
              checkId: schemaCheckID,
              lintWarnings: [],
              lintErrors: [],
              graphPruneWarnings: [],
              graphPruneErrors: [],
              compositionWarnings: [],
            };
          }
          if (namespace.enableGraphPruning) {
            const parsedSchema = parse(newSchemaSDL);
            // this new GraphQL schema conatins the location info
            newGraphQLSchema = buildASTSchema(parsedSchema, { assumeValid: true, assumeValidSDL: true });
          }
        } catch (e: any) {
          await this.update({
            schemaCheckID,
            compositionSkipped: true,
            breakingChangesSkipped: true,
            trafficCheckSkipped: true,
            graphPruningSkipped: true,
            lintSkipped: true,
            errorMessage: `Invalid schema of subgraph '${s.name}'`,
          });

          return {
            response: {
              code: EnumStatusCode.ERR_INVALID_SUBGRAPH_SCHEMA,
              details: e.message,
            },
            breakingChanges: [],
            nonBreakingChanges: [],
            compositionErrors: [],
            checkId: schemaCheckID,
            lintWarnings: [],
            lintErrors: [],
            graphPruneWarnings: [],
            graphPruneErrors: [],
            compositionWarnings: [],
          };
        }
      }

      const schemaChanges = await getDiffBetweenGraphs(
        subgraph?.schemaSDL || '',
        newSchemaSDL,
        routerCompatibilityVersion,
      );
      if (schemaChanges.kind === 'failure') {
        logger.warn(`Error finding diff between graphs of the subgraph ${s.name}: ${schemaChanges.error}`);

        await this.update({
          schemaCheckID,
          compositionSkipped: true,
          breakingChangesSkipped: true,
          trafficCheckSkipped: true,
          graphPruningSkipped: true,
          lintSkipped: true,
          errorMessage: `Breaking change detection failed for the subgraph '${s.name}'`,
        });

        return {
          response: {
            code: schemaChanges.errorCode,
            details: schemaChanges.errorMessage,
          },
          breakingChanges: [],
          nonBreakingChanges: [],
          compositionErrors: [],
          checkId: schemaCheckID,
          lintWarnings: [],
          lintErrors: [],
          graphPruneWarnings: [],
          graphPruneErrors: [],
          compositionWarnings: [],
        };
      }

      checkSubgraphs.set(s.name, {
        subgraph,
        newSchemaSDL,
        newGraphQLSchema,
        schemaChanges,
        inspectorChanges: [],
        storedBreakingChanges: [],
        checkSubgraphId: schemaCheckSubgraphId,
        routerCompatibilityVersion,
        labels: s.isNew ? s.labels : undefined,
      });
    }

    let proposalMatchMessage: string | undefined;
    for (const [subgraphName, checkSubgraph] of checkSubgraphs.entries()) {
      const {
        subgraph,
        newSchemaSDL,
        newGraphQLSchema,
        schemaChanges,
        routerCompatibilityVersion,
        checkSubgraphId: schemaCheckSubgraphId,
      } = checkSubgraph;
      if (namespace.enableProposals && !skipProposalMatchCheck) {
        const proposalConfig = await proposalRepo.getProposalConfig({ namespaceId: namespace.id });
        // currently matching only with the subgraph that is already present in the namespace
        if (proposalConfig) {
          const match = await proposalRepo.matchSchemaWithProposal({
            subgraphName,
            namespaceId: namespace.id,
            schemaSDL: newSchemaSDL,
            routerCompatibilityVersion,
            schemaCheckId: schemaCheckID,
            isDeleted: newSchemaSDL === '',
          });

          await this.update({
            schemaCheckID,
            proposalMatch: match ? 'success' : proposalConfig.checkSeverityLevel === 'warn' ? 'warn' : 'error',
          });

          if (!match) {
            if (proposalConfig.checkSeverityLevel === 'warn') {
              proposalMatchMessage += `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.\n`;
            } else {
              await this.update({
                schemaCheckID,
                compositionSkipped: true,
                breakingChangesSkipped: true,
                trafficCheckSkipped: true,
                graphPruningSkipped: true,
                lintSkipped: true,
              });

              return {
                response: {
                  code: EnumStatusCode.ERR_SCHEMA_MISMATCH_WITH_APPROVED_PROPOSAL,
                  details: `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.`,
                },
                breakingChanges: [],
                nonBreakingChanges: [],
                compositionErrors: [],
                checkId: schemaCheckID,
                lintWarnings: [],
                lintErrors: [],
                graphPruneWarnings: [],
                graphPruneErrors: [],
                compositionWarnings: [],
                proposalMatchMessage: `The subgraph ${subgraphName}'s schema does not match to this subgraph's schema in any approved proposal.`,
              };
            }
          }
        }
      }

      await this.createSchemaCheckChanges({
        changes: schemaChanges.nonBreakingChanges,
        schemaCheckID,
        schemaCheckSubgraphId,
      });

      const storedBreakingChanges = await this.createSchemaCheckChanges({
        changes: schemaChanges.breakingChanges,
        schemaCheckID,
        schemaCheckSubgraphId,
      });

      let inspectorChanges: InspectorSchemaChange[] = [];
      // For operations checks we only consider breaking changes
      inspectorChanges = trafficInspector.schemaChangesToInspectorChanges(
        schemaChanges.breakingChanges,
        storedBreakingChanges,
      );

      checkSubgraphs.set(subgraphName, {
        ...checkSubgraph,
        inspectorChanges,
        storedBreakingChanges,
        checkSubgraphId: schemaCheckSubgraphId,
      });

      const lintIssues: SchemaLintIssues = await schemaLintRepo.performSchemaLintCheck({
        schemaCheckID,
        newSchemaSDL,
        namespaceId: namespace.id,
        isLintingEnabled: namespace.enableLinting,
        schemaCheckSubgraphId,
      });

      let graphPruningIssues: SchemaGraphPruningIssues = {
        warnings: [],
        errors: [],
      };
      if (subgraph) {
        graphPruningIssues = await schemaGraphPruningRepo.performSchemaGraphPruningCheck({
          newGraphQLSchema,
          schemaCheckID,
          subgraph,
          namespaceID: namespace.id,
          organizationID: organizationId,
          isGraphPruningEnabled: namespace.enableGraphPruning,
          schemaChanges,
          chClient,
          fedGraphRepo,
          subgraphRepo,
          rangeInDays: limit,
          schemaCheckSubgraphId,
        });
      }

      breakingChanges.push(
        ...schemaChanges.breakingChanges.map(
          (c) =>
            new SchemaChange({
              ...c,
              subgraphName,
            }),
        ),
      );
      nonBreakingChanges.push(
        ...schemaChanges.nonBreakingChanges.map(
          (c) =>
            new SchemaChange({
              ...c,
              subgraphName,
            }),
        ),
      );
      lintErrors.push(
        ...lintIssues.errors.map(
          (e) =>
            new LintIssue({
              ...e,
              subgraphName,
            }),
        ),
      );
      lintWarnings.push(
        ...lintIssues.warnings.map(
          (w) =>
            new LintIssue({
              ...w,
              subgraphName,
            }),
        ),
      );
      graphPruneErrors.push(
        ...graphPruningIssues.errors.map(
          (e) =>
            new GraphPruningIssue({
              ...e,
              subgraphName,
            }),
        ),
      );
      graphPruneWarnings.push(
        ...graphPruningIssues.warnings.map(
          (w) =>
            new GraphPruningIssue({
              ...w,
              subgraphName,
            }),
        ),
      );
    }

    const { composedGraphs } = await composer.composeWithProposedSchemas({
      inputSubgraphs: checkSubgraphs,
      graphs: federatedGraphs,
    });

    await this.createSchemaCheckCompositions({
      schemaCheckID,
      compositions: composedGraphs,
    });

    let hasClientTraffic = false;

    for (const composition of composedGraphs) {
      for (const error of composition.errors) {
        compositionErrors.push({
          message: error.message,
          federatedGraphName: composition.name,
          namespace: composition.namespace,
          featureFlag: '',
        });
      }

      for (const warning of composition.warnings) {
        compositionWarnings.push({
          message: warning.message,
          federatedGraphName: composition.name,
          namespace: composition.namespace,
          featureFlag: '',
        });
      }

      /*
          We don't collect operation usage when
          1. we have composition errors
          2. when we don't have any inspectable changes.
          3. When user wants to skip the traffic check altogether
          That means any breaking change is really breaking
          */
      for (const [subgraphName, checkSubgraph] of checkSubgraphs.entries()) {
        if (composition.errors.length > 0 || checkSubgraph.inspectorChanges.length === 0) {
          continue;
        }

        let result: Map<string, InspectorOperationResult[]> = new Map();
        if (checkSubgraph.subgraph) {
          result = await trafficInspector.inspect(checkSubgraph.inspectorChanges, {
            daysToConsider: limit,
            federatedGraphId: composition.id,
            organizationId,
            subgraphId: checkSubgraph.subgraph.id,
          });
        }

        if (result.size === 0) {
          continue;
        }

        const overrideCheck = await this.checkClientTrafficAgainstOverrides({
          changes: checkSubgraph.storedBreakingChanges,
          inspectorResultsByChangeId: result,
          namespaceId: namespace.id,
        });

        hasClientTraffic = overrideCheck.hasUnsafeClientTraffic;

        // Store operation usage
        await this.createOperationUsage(overrideCheck.result, composition.id);

        // Collect all inspected operations for later aggregation
        for (const resultElement of overrideCheck.result.values()) {
          inspectedOperations.push(...resultElement);
        }
      }
    }

    // Update the overall schema check with the results
    await this.update({
      schemaCheckID,
      hasClientTraffic,
      hasBreakingChanges: breakingChanges.length > 0,
      hasLintErrors: lintErrors.length > 0,
      hasGraphPruningErrors: graphPruneErrors.length > 0,
    });

    return {
      response: {
        code: EnumStatusCode.OK,
      },
      breakingChanges,
      nonBreakingChanges,
      compositionErrors,
      checkId: schemaCheckID,
      lintWarnings,
      lintErrors,
      graphPruneWarnings,
      graphPruneErrors,
      compositionWarnings,
      operationUsageStats: collectOperationUsageStats(inspectedOperations),
      proposalMatchMessage,
    };
  }