Future service()

in app_dart/lib/src/request_handling/subscription_handler.dart [60:146]


  Future<void> service(
    HttpRequest request, {
    Future<void> Function(HttpStatusException)? onError,
  }) async {
    AuthenticatedContext authContext;
    final AuthenticationProvider _authProvider = authProvider ?? PubsubAuthenticationProvider(config);
    try {
      authContext = await _authProvider.authenticate(request);
    } on Unauthenticated catch (error) {
      final HttpResponse response = request.response;
      response
        ..statusCode = HttpStatus.unauthorized
        ..write(error.message);
      await response.flush();
      await response.close();
      return;
    }

    List<int> body;
    try {
      body = await request.expand<int>((List<int> chunk) => chunk).toList();
    } catch (error) {
      final HttpResponse response = request.response;
      response
        ..statusCode = HttpStatus.internalServerError
        ..write('$error');
      await response.flush();
      await response.close();
      return;
    }

    PushMessageEnvelope? envelope;
    if (body.isNotEmpty) {
      try {
        final Map<String, dynamic> json = jsonDecode(utf8.decode(body)) as Map<String, dynamic>;
        envelope = PushMessageEnvelope.fromJson(json);
      } catch (error) {
        final HttpResponse response = request.response;
        response
          ..statusCode = HttpStatus.internalServerError
          ..write('$error');
        await response.flush();
        await response.close();
        return;
      }
    }

    if (envelope == null) {
      throw const BadRequestException('Failed to get message');
    }
    log.finer(envelope.toJson());

    final String messageId = envelope.message!.messageId!;

    final Uint8List? messageLock = await cache.getOrCreate(topicName, messageId);
    if (messageLock != null) {
      // No-op - There's already a write lock for this message
      final HttpResponse response = request.response
        ..statusCode = HttpStatus.ok
        ..write('$messageId was already processed');
      await response.flush();
      await response.close();
      return;
    }

    // Create a write lock in the cache to ensure requests are only processed once
    final Uint8List lockValue = Uint8List.fromList('l'.codeUnits);
    await cache.set(
      topicName,
      messageId,
      lockValue,
      ttl: const Duration(days: 1),
    );

    await runZoned<Future<void>>(
      () async => await super.service(
        request,
        onError: (_) async {
          await cache.purge(topicName, messageId);
        },
      ),
      zoneValues: <RequestKey<dynamic>, Object?>{
        PubSubKey.message: envelope.message,
        ApiKey.authContext: authContext,
      },
    );
  }