Future _performTarballUpload()

in app/lib/package/backend.dart [788:944]


  Future<PackageVersion> _performTarballUpload(
    User user,
    Future<void> Function(String name, String version) tarballUpload,
    UploadRestrictionStatus restriction,
    PackageSummary archive,
  ) async {
    final sw = Stopwatch()..start();
    final entities = await _createUploadEntities(db, user, archive);
    final newVersion = entities.packageVersion;
    final currentDartSdk = await getDartSdkVersion();

    Package? package;
    String? prevLatestStableVersion;
    String? prevLatestPrereleaseVersion;

    // Add the new package to the repository by storing the tarball and
    // inserting metadata to datastore (which happens atomically).
    final pv = await withRetryTransaction(db, (tx) async {
      _logger.info('Starting datastore transaction.');

      final tuple = (await tx.lookup([newVersion.key, newVersion.packageKey!]));
      final version = tuple[0] as PackageVersion?;
      package = tuple[1] as Package?;

      // If the version already exists, we fail.
      if (version != null) {
        _logger.info('Version ${version.version} of package '
            '${version.package} already exists, rolling transaction back.');
        throw PackageRejectedException.versionExists(
            version.package, version.version!);
      }

      // reserved package names for the Dart team
      if (package == null &&
          matchesReservedPackageName(newVersion.package) &&
          !user.email!.endsWith('@google.com')) {
        throw PackageRejectedException.nameReserved(newVersion.package);
      }

      // If the package does not exist, then we create a new package.
      prevLatestStableVersion = package?.latestVersion;
      prevLatestPrereleaseVersion = package?.latestPrereleaseVersion;
      if (package == null) {
        _logger.info('New package uploaded. [new-package-uploaded]');
        if (restriction == UploadRestrictionStatus.onlyUpdates) {
          throw PackageRejectedException.uploadRestricted();
        }
        package = Package.fromVersion(newVersion);
      } else if (!await packageBackend.isPackageAdmin(package!, user.userId)) {
        _logger.info('User ${user.userId} (${user.email}) is not an uploader '
            'for package ${package!.name}, rolling transaction back.');
        throw AuthorizationException.userCannotUploadNewVersion(
            user.email!, package!.name!);
      }

      if (package!.versionCount >= _maxVersionsPerPackage) {
        throw PackageRejectedException.maxVersionCountReached(
            newVersion.package, _maxVersionsPerPackage);
      }

      if (package!.isNotVisible) {
        throw PackageRejectedException.isWithheld();
      }

      if (package!.deletedVersions != null &&
          package!.deletedVersions!.contains(newVersion.version!)) {
        throw PackageRejectedException.versionDeleted(
            package!.name!, newVersion.version!);
      }

      // Store the publisher of the package at the time of the upload.
      newVersion.publisherId = package!.publisherId;

      // Keep the latest version in the package object up-to-date.
      package!.updateVersion(newVersion,
          dartSdkVersion: currentDartSdk.semanticVersion);
      package!.updated = clock.now().toUtc();
      package!.versionCount++;

      _logger.info(
        'Trying to upload tarball for ${package!.name} version ${newVersion.version} to cloud storage.',
      );
      // Apply update: Push to cloud storage
      await tarballUpload(package!.name!, newVersion.version!);

      final inserts = <Model>[
        package!,
        newVersion,
        entities.packageVersionInfo,
        ...entities.assets,
        AuditLogRecord.packagePublished(
          uploader: user,
          package: newVersion.package,
          version: newVersion.version!,
          created: newVersion.created!,
          publisherId: package!.publisherId,
        ),
      ];

      _logger.info('Trying to commit datastore changes.');
      tx.queueMutations(inserts: inserts);
      return newVersion;
    });
    _logger.info('Upload successful. [package-uploaded]');
    _logger.info('Upload transaction compelted in ${sw.elapsed}.');
    sw.reset();

    _logger.info('Invalidating cache for package ${newVersion.package}.');
    await purgePackageCache(newVersion.package);

    try {
      final uploaderEmails = package!.publisherId == null
          ? await accountBackend.getEmailsOfUserIds(package!.uploaders!)
          : await publisherBackend.getAdminMemberEmails(package!.publisherId!);

      // Notify uploaders via email that a new version has been published.
      final email = emailSender.sendMessage(createPackageUploadedEmail(
        packageName: newVersion.package,
        packageVersion: newVersion.version!,
        uploaderEmail: user.email!,
        authorizedUploaders:
            uploaderEmails.map((email) => EmailAddress(null, email)).toList(),
      ));

      final latestVersionChanged = prevLatestStableVersion != null &&
          package!.latestVersion != prevLatestStableVersion;
      final latestPrereleaseVersionChanged =
          prevLatestPrereleaseVersion != null &&
              package!.latestPrereleaseVersion != prevLatestPrereleaseVersion;
      // Let's not block the upload response on these. In case of a timeout, the
      // underlying operations still go ahead, but the `Future.wait` call below
      // is not blocked on it.
      await Future.wait([
        email,
        // Trigger analysis and dartdoc generation. Dependent packages can be left
        // out here, because the dependency graph's background polling will pick up
        // the new upload, and will trigger analysis for the dependent packages.
        jobBackend.triggerAnalysis(newVersion.package, newVersion.version),
        jobBackend.triggerDartdoc(newVersion.package, newVersion.version),
        // Trigger a new doc generation for the previous latest stable version
        // in order to update the dartdoc entry and the canonical-urls.
        if (latestVersionChanged)
          jobBackend.triggerDartdoc(newVersion.package, prevLatestStableVersion,
              shouldProcess: true),
        // Reset the priority of the previous pre-release version.
        if (latestPrereleaseVersionChanged)
          jobBackend.triggerDartdoc(
              newVersion.package, prevLatestPrereleaseVersion,
              shouldProcess: false),
      ]).timeout(Duration(seconds: 10));
    } catch (e, st) {
      final v = newVersion.qualifiedVersionKey;
      _logger.severe('Error post-processing package upload $v', e, st);
    }
    _logger.info('Post-upload tasks completed in ${sw.elapsed}.');
    return pv;
  }