export function publishFederatedSubgraph()

in controlplane/src/core/bufservices/subgraph/publishFederatedSubgraph.ts [30:456]


export function publishFederatedSubgraph(
  opts: RouterOptions,
  req: PublishFederatedSubgraphRequest,
  ctx: HandlerContext,
): Promise<PlainMessage<PublishFederatedSubgraphResponse>> {
  let logger = getLogger(ctx, opts.logger);

  return handleError<PlainMessage<PublishFederatedSubgraphResponse>>(ctx, logger, async () => {
    const authContext = await opts.authenticator.authenticate(ctx.requestHeader);
    logger = enrichLogger(ctx, logger, authContext);

    const orgWebhooks = new OrganizationWebhookService(
      opts.db,
      authContext.organizationId,
      opts.logger,
      opts.billingDefaultPlanId,
    );
    const auditLogRepo = new AuditLogRepository(opts.db);
    const fedGraphRepo = new FederatedGraphRepository(logger, opts.db, authContext.organizationId);
    const namespaceRepo = new NamespaceRepository(opts.db, authContext.organizationId);
    const subgraphRepo = new SubgraphRepository(logger, opts.db, authContext.organizationId);
    const schemaGraphPruningRepo = new SchemaGraphPruningRepository(opts.db);

    req.namespace = req.namespace || DefaultNamespace;

    if (!authContext.hasWriteAccess) {
      return {
        response: {
          code: EnumStatusCode.ERR,
          details: `The user doesnt have the permissions to perform this operation`,
        },
        compositionErrors: [],
        deploymentErrors: [],
        compositionWarnings: [],
      };
    }

    const subgraphSchemaSDL = req.schema;
    const namespace = await namespaceRepo.byName(req.namespace);
    if (!namespace) {
      return {
        response: {
          code: EnumStatusCode.ERR_NOT_FOUND,
          details: `Could not find namespace ${req.namespace}`,
        },
        compositionErrors: [],
        deploymentErrors: [],
        compositionWarnings: [],
      };
    }
    let subgraph = await subgraphRepo.byName(req.name, req.namespace);
    let isEventDrivenGraph = false;
    let isV2Graph: boolean | undefined;

    try {
      const federatedGraphs = subgraph
        ? await fedGraphRepo.bySubgraphLabels({ labels: subgraph.labels, namespaceId: namespace.id })
        : [];
      /*
       * If there are any federated graphs for which the subgraph is a constituent, the subgraph will be validated
       * against the first router compatibility version encountered.
       * If no federated graphs have yet been created, the subgraph will be validated against the latest router
       * compatibility version.
       */
      // Here we check if the schema is valid as a subgraph SDL
      const result = buildSchema(subgraphSchemaSDL, true, getFederatedGraphRouterCompatibilityVersion(federatedGraphs));
      if (!result.success) {
        return {
          response: {
            code: EnumStatusCode.ERR_INVALID_SUBGRAPH_SCHEMA,
            details: result.errors.map((e) => e.toString()).join('\n'),
          },
          compositionErrors: [],
          deploymentErrors: [],
          compositionWarnings: [],
        };
      }
      isEventDrivenGraph = result.isEventDrivenGraph || false;
      isV2Graph = result.isVersionTwo;
    } catch (e: any) {
      return {
        response: {
          code: EnumStatusCode.ERR_INVALID_SUBGRAPH_SCHEMA,
          details: e.message,
        },
        compositionErrors: [],
        deploymentErrors: [],
        compositionWarnings: [],
      };
    }

    const routingUrl = req.routingUrl || '';
    let baseSubgraphID = '';

    /* If the subgraph exists, validate that no parameters were included.
     * Otherwise, validate the input and create the subgraph.
     */
    if (subgraph) {
      // check whether the user is authorized to perform the action
      await opts.authorizer.authorize({
        db: opts.db,
        graph: {
          targetId: subgraph.targetId,
          targetType: 'subgraph',
        },
        headers: ctx.requestHeader,
        authContext,
      });
      /* The subgraph already exists, so the database flag and the normalization result should match.
       * If he flags do not match, the database is the source of truth, so return an appropriate error.
       * */
      if (subgraph.isEventDrivenGraph !== isEventDrivenGraph) {
        return {
          response: {
            code: EnumStatusCode.ERR,
            details: isEventDrivenGraph
              ? 'The subgraph was originally created as a regular subgraph.' +
                ' A regular subgraph cannot be retroactively changed into an Event-Driven Graph (EDG).' +
                ' Please create a new Event-Driven subgraph with the --edg flag.'
              : 'The subgraph was originally created as an Event-Driven Graph (EDG).' +
                ' An EDG cannot be retroactively changed into a regular subgraph.' +
                ' Please create a new regular subgraph.',
          },
          compositionErrors: [],
          deploymentErrors: [],
          compositionWarnings: [],
        };
      }
    } else {
      if (req.isFeatureSubgraph) {
        if (req.baseSubgraphName) {
          const baseSubgraph = await subgraphRepo.byName(req.baseSubgraphName, req.namespace);
          if (!baseSubgraph) {
            return {
              response: {
                code: EnumStatusCode.ERR,
                details: `Base subgraph "${req.baseSubgraphName}" does not exist in the namespace "${req.namespace}".`,
              },
              compositionErrors: [],
              deploymentErrors: [],
              compositionWarnings: [],
            };
          }
          baseSubgraphID = baseSubgraph.id;
        } else {
          return {
            response: {
              code: EnumStatusCode.ERR_NOT_FOUND,
              details: `Feature Subgraph ${req.name} not found. If intended to create and publish, please pass the name of the base subgraph with --subgraph option.`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }
      }

      // Labels are not required but should be valid if included.
      if (!isValidLabels(req.labels)) {
        return {
          response: {
            code: EnumStatusCode.ERR_INVALID_LABELS,
            details: `One or more labels were found to be invalid`,
          },
          compositionErrors: [],
          deploymentErrors: [],
          compositionWarnings: [],
        };
      }

      if (isEventDrivenGraph) {
        if (req.routingUrl !== undefined) {
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: `An Event-Driven Graph must not define a routing URL`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }
        if (req.subscriptionUrl !== undefined) {
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: `An Event-Driven Graph must not define a subscription URL`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }
        if (req.subscriptionProtocol !== undefined) {
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: `An Event-Driven Graph must not define a subscription protocol`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }
        if (req.websocketSubprotocol !== undefined) {
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: `An Event-Driven Graph must not define a websocket subprotocol.`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }
      } else {
        if (!isValidUrl(routingUrl)) {
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: routingUrl
                ? `Routing URL "${routingUrl}" is not a valid URL.`
                : req.isFeatureSubgraph
                  ? `A valid, non-empty routing URL is required to create and publish a feature subgraph.`
                  : `A valid, non-empty routing URL is required to create and publish a non-Event-Driven subgraph.`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }

        if (req.subscriptionUrl && !isValidUrl(req.subscriptionUrl)) {
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: `Subscription URL "${req.subscriptionUrl}" is not a valid URL`,
            },
            compositionErrors: [],
            deploymentErrors: [],
            compositionWarnings: [],
          };
        }
      }

      if (!isValidGraphName(req.name)) {
        return {
          response: {
            code: EnumStatusCode.ERR_INVALID_NAME,
            details: `The name of the subgraph is invalid. Name should start and end with an alphanumeric character. Only '.', '_', '@', '/', and '-' are allowed as separators in between and must be between 1 and 100 characters in length.`,
          },
          compositionErrors: [],
          deploymentErrors: [],
          compositionWarnings: [],
        };
      }

      // Create the subgraph if it doesn't exist
      subgraph = await subgraphRepo.create({
        name: req.name,
        namespace: req.namespace,
        namespaceId: namespace.id,
        createdBy: authContext.userId,
        labels: req.labels,
        isEventDrivenGraph,
        routingUrl,
        subscriptionUrl: req.subscriptionUrl,
        subscriptionProtocol:
          req.subscriptionProtocol === undefined ? undefined : formatSubscriptionProtocol(req.subscriptionProtocol),
        websocketSubprotocol:
          req.websocketSubprotocol === undefined ? undefined : formatWebsocketSubprotocol(req.websocketSubprotocol),
        featureSubgraphOptions:
          req.isFeatureSubgraph && baseSubgraphID !== ''
            ? {
                isFeatureSubgraph: req.isFeatureSubgraph || false,
                baseSubgraphID,
              }
            : undefined,
      });

      if (!subgraph) {
        throw new Error(`Subgraph '${req.name}' could not be created`);
      }

      await auditLogRepo.addAuditLog({
        organizationId: authContext.organizationId,
        auditAction: 'subgraph.created',
        action: 'created',
        actorId: authContext.userId,
        auditableType: 'subgraph',
        auditableDisplayName: subgraph.name,
        actorDisplayName: authContext.userDisplayName,
        apiKeyName: authContext.apiKeyName,
        actorType: authContext.auth === 'api_key' ? 'api_key' : 'user',
        targetNamespaceId: subgraph.namespaceId,
        targetNamespaceDisplayName: subgraph.namespace,
      });
    }

    const { compositionErrors, updatedFederatedGraphs, deploymentErrors, subgraphChanged, compositionWarnings } =
      await subgraphRepo.update(
        {
          targetId: subgraph.targetId,
          labels: subgraph.labels,
          unsetLabels: false,
          schemaSDL: subgraphSchemaSDL,
          updatedBy: authContext.userId,
          namespaceId: namespace.id,
          isV2Graph,
        },
        opts.blobStorage,
        {
          cdnBaseUrl: opts.cdnBaseUrl,
          webhookJWTSecret: opts.admissionWebhookJWTSecret,
        },
        opts.chClient!,
      );

    for (const graph of updatedFederatedGraphs) {
      const hasErrors =
        compositionErrors.some((error) => error.federatedGraphName === graph.name) ||
        deploymentErrors.some((error) => error.federatedGraphName === graph.name);
      orgWebhooks.send(
        {
          eventName: OrganizationEventName.FEDERATED_GRAPH_SCHEMA_UPDATED,
          payload: {
            federated_graph: {
              id: graph.id,
              name: graph.name,
              namespace: graph.namespace,
            },
            organization: {
              id: authContext.organizationId,
              slug: authContext.organizationSlug,
            },
            errors: hasErrors,
            actor_id: authContext.userId,
          },
        },
        authContext.userId,
      );
    }

    await auditLogRepo.addAuditLog({
      organizationId: authContext.organizationId,
      auditAction: subgraph.isFeatureSubgraph ? 'feature_subgraph.updated' : 'subgraph.updated',
      action: 'updated',
      actorId: authContext.userId,
      auditableType: subgraph.isFeatureSubgraph ? 'feature_subgraph' : 'subgraph',
      auditableDisplayName: subgraph.name,
      actorDisplayName: authContext.userDisplayName,
      apiKeyName: authContext.apiKeyName,
      actorType: authContext.auth === 'api_key' ? 'api_key' : 'user',
      targetNamespaceId: subgraph.namespaceId,
      targetNamespaceDisplayName: subgraph.namespace,
    });

    if (namespace.enableGraphPruning) {
      const graphPruningConfigs = await schemaGraphPruningRepo.getNamespaceGraphPruningConfig(namespace.id);
      await subgraphRepo.handleSubgraphFieldGracePeriods({
        subgraphId: subgraph.id,
        namespaceId: subgraph.namespaceId,
        schemaSDL: subgraph.schemaSDL,
        newSchemaSDL: req.schema,
        graphPruningConfigs,
      });
    }

    if (
      opts.openaiApiKey &&
      // Avoid calling OpenAI API if the schema is too big.
      // Best effort approach. This way of counting tokens is not accurate.
      subgraph.schemaSDL.length <= 10_000
    ) {
      const orgRepo = new OrganizationRepository(logger, opts.db, opts.billingDefaultPlanId);
      const feature = await orgRepo.getFeature({
        organizationId: authContext.organizationId,
        featureId: 'ai',
      });

      if (feature?.enabled) {
        try {
          await opts.queues.readmeQueue.addJob({
            organizationId: authContext.organizationId,
            targetId: subgraph.targetId,
            type: 'subgraph',
          });
        } catch (e) {
          logger.error(e, `Error adding job to subgraph readme queue`);
          // Swallow error because this is not critical
        }
      }
    }

    if (compositionErrors.length > 0) {
      return {
        response: {
          code: EnumStatusCode.ERR_SUBGRAPH_COMPOSITION_FAILED,
        },
        compositionErrors,
        compositionWarnings,
        deploymentErrors: [],
      };
    }

    if (deploymentErrors.length > 0) {
      return {
        response: {
          code: EnumStatusCode.ERR_DEPLOYMENT_FAILED,
        },
        compositionErrors: [],
        deploymentErrors,
        compositionWarnings,
      };
    }

    return {
      response: {
        code: EnumStatusCode.OK,
      },
      compositionErrors: [],
      deploymentErrors: [],
      hasChanged: subgraphChanged,
      compositionWarnings,
    };
  });
}