in app_dart/lib/src/request_handling/subscription_handler.dart [60:146]
Future<void> service(
HttpRequest request, {
Future<void> Function(HttpStatusException)? onError,
}) async {
AuthenticatedContext authContext;
final AuthenticationProvider _authProvider = authProvider ?? PubsubAuthenticationProvider(config);
try {
authContext = await _authProvider.authenticate(request);
} on Unauthenticated catch (error) {
final HttpResponse response = request.response;
response
..statusCode = HttpStatus.unauthorized
..write(error.message);
await response.flush();
await response.close();
return;
}
List<int> body;
try {
body = await request.expand<int>((List<int> chunk) => chunk).toList();
} catch (error) {
final HttpResponse response = request.response;
response
..statusCode = HttpStatus.internalServerError
..write('$error');
await response.flush();
await response.close();
return;
}
PushMessageEnvelope? envelope;
if (body.isNotEmpty) {
try {
final Map<String, dynamic> json = jsonDecode(utf8.decode(body)) as Map<String, dynamic>;
envelope = PushMessageEnvelope.fromJson(json);
} catch (error) {
final HttpResponse response = request.response;
response
..statusCode = HttpStatus.internalServerError
..write('$error');
await response.flush();
await response.close();
return;
}
}
if (envelope == null) {
throw const BadRequestException('Failed to get message');
}
log.finer(envelope.toJson());
final String messageId = envelope.message!.messageId!;
final Uint8List? messageLock = await cache.getOrCreate(topicName, messageId);
if (messageLock != null) {
// No-op - There's already a write lock for this message
final HttpResponse response = request.response
..statusCode = HttpStatus.ok
..write('$messageId was already processed');
await response.flush();
await response.close();
return;
}
// Create a write lock in the cache to ensure requests are only processed once
final Uint8List lockValue = Uint8List.fromList('l'.codeUnits);
await cache.set(
topicName,
messageId,
lockValue,
ttl: const Duration(days: 1),
);
await runZoned<Future<void>>(
() async => await super.service(
request,
onError: (_) async {
await cache.purge(topicName, messageId);
},
),
zoneValues: <RequestKey<dynamic>, Object?>{
PubSubKey.message: envelope.message,
ApiKey.authContext: authContext,
},
);
}