export function publishPersistedOperations()

in controlplane/src/core/bufservices/persisted-operation/publishPersistedOperations.ts [19:208]


export function publishPersistedOperations(
  opts: RouterOptions,
  req: PublishPersistedOperationsRequest,
  ctx: HandlerContext,
): Promise<PlainMessage<PublishPersistedOperationsResponse>> {
  /**
   * Receives a federated graph name and a list of persisted operation contents.
   * First, it validates that the graph exists and all the operations are valid,
   * then it stores them. Additionally, if the provided client name for registering
   * the operations has never been seen before, we create an entry in the database
   * with it.
   */
  let logger = getLogger(ctx, opts.logger);

  return handleError<PlainMessage<PublishPersistedOperationsResponse>>(ctx, logger, async () => {
    req.namespace = req.namespace || DefaultNamespace;

    const authContext = await opts.authenticator.authenticate(ctx.requestHeader);
    logger = enrichLogger(ctx, logger, authContext);

    if (!authContext.hasWriteAccess) {
      return {
        response: {
          code: EnumStatusCode.ERR,
          details: `The user doesnt have the permissions to perform this operation`,
        },
        operations: [],
      };
    }
    const userId = authContext.userId;
    if (!userId) {
      return {
        response: {
          code: EnumStatusCode.ERROR_NOT_AUTHENTICATED,
          details: `User not found in the authentication context`,
        },
        operations: [],
      };
    }
    const organizationId = authContext.organizationId;
    const federatedGraphRepo = new FederatedGraphRepository(logger, opts.db, organizationId);

    // Validate everything before we update any data
    const federatedGraph = await federatedGraphRepo.byName(req.fedGraphName, req.namespace);
    if (federatedGraph === undefined) {
      return {
        response: {
          code: EnumStatusCode.ERR_NOT_FOUND,
          details: `Federated graph '${req.fedGraphName}' does not exist`,
        },
        operations: [],
      };
    }

    const schema = await federatedGraphRepo.getLatestValidSchemaVersion({
      targetId: federatedGraph.targetId,
    });
    if (!schema?.schema) {
      return {
        response: {
          code: EnumStatusCode.ERR_NOT_FOUND,
          details: `Schema for '${req.fedGraphName}' does not exist`,
        },
        operations: [],
      };
    }
    const graphAST = parse(schema.schema);
    const graphSchema = graphQLBuildASTSchema(graphAST);
    for (const operation of req.operations) {
      const contents = operation.contents;
      let opAST: DocumentNode;
      try {
        opAST = parse(operation.contents);
      } catch (e: any) {
        return {
          response: {
            code: EnumStatusCode.ERR,
            details: `Operation ${operation.id} (${contents}) is not valid: ${e}`,
          },
          operations: [],
        };
      }
      const errors = validate(graphSchema, opAST, undefined, { maxErrors: 1 });
      if (errors.length > 0) {
        const errorDetails = errors.map((e) => `${e.toString()}`).join(', ');
        return {
          response: {
            code: EnumStatusCode.ERR,
            details: `Operation ${operation.id} ("${contents}") is not valid: ${errorDetails}`,
          },
          operations: [],
        };
      }
    }
    const operationsRepo = new OperationsRepository(opts.db, federatedGraph.id);
    let clientId: string;
    try {
      clientId = await operationsRepo.registerClient(req.clientName, userId);
    } catch (e: any) {
      const message = e instanceof Error ? e.message : e.toString();
      return {
        response: {
          code: EnumStatusCode.ERR,
          details: `Could not register client "${req.clientName}": ${message}`,
        },
        operations: [],
      };
    }
    const operations: PublishedOperation[] = [];
    const updatedOperations: UpdatedPersistedOperation[] = [];
    // Retrieve the operations that have already been published
    const operationsResult = await operationsRepo.getPersistedOperations(clientId);
    const operationsByOperationId = new Map(
      operationsResult.map((op) => [op.operationId, { hash: op.hash, operationNames: op.operationNames }]),
    );
    for (const operation of req.operations) {
      const operationId = operation.id;
      const operationHash = crypto.createHash('sha256').update(operation.contents).digest('hex');
      const prev = operationsByOperationId.get(operationId);
      if (prev !== undefined && prev.hash !== operationHash) {
        // We're trying to update an operation with the same ID but different hash
        operations.push(
          new PublishedOperation({
            id: operationId,
            hash: prev.hash,
            status: PublishedOperationStatus.CONFLICT,
            operationNames: prev.operationNames,
          }),
        );
        continue;
      }
      const operationNames = extractOperationNames(operation.contents);
      operationsByOperationId.set(operationId, { hash: operationHash, operationNames });
      const path = `${organizationId}/${federatedGraph.id}/operations/${req.clientName}/${operationId}.json`;
      updatedOperations.push({
        operationId,
        hash: operationHash,
        filePath: path,
        contents: operation.contents,
        operationNames,
      });

      // New operation
      let status: PublishedOperationStatus;
      if (prev === undefined) {
        const data: PublishedOperationData = {
          version: 1,
          body: operation.contents,
        };
        try {
          await opts.blobStorage.putObject({
            key: path,
            body: Buffer.from(JSON.stringify(data), 'utf8'),
            contentType: 'application/json; charset=utf-8',
          });
        } catch (e) {
          logger.error(e, `Could not store operation contents for ${operationId} at ${path}`);
          return {
            response: {
              code: EnumStatusCode.ERR,
              details: `Could not store operation contents for ${operationId} at ${path}`,
            },
            operations: [],
          };
        }

        status = PublishedOperationStatus.CREATED;
      } else {
        status = PublishedOperationStatus.UP_TO_DATE;
      }
      operations.push(
        new PublishedOperation({
          id: operationId,
          hash: operationHash,
          status,
          operationNames,
        }),
      );
    }

    await operationsRepo.updatePersistedOperations(clientId, userId, updatedOperations);

    return {
      response: {
        code: EnumStatusCode.OK,
      },
      operations,
    };
  });
}