Future startIsolates()

in app/lib/service/entrypoint/_isolate.dart [58:268]


Future startIsolates({
  required Logger logger,
  Future<void> Function(FrontendEntryMessage message)? frontendEntryPoint,
  Future<void> Function(WorkerEntryMessage message)? workerEntryPoint,
  Duration? deadWorkerTimeout,
}) async {
  await withServices(() async {
    if (!envConfig.isRunningLocally) {
      // The existence of this file may indicate an issue with the service health.
      // Checking it only in AppEngine environment.
      final stampFile =
          File(p.join(Directory.systemTemp.path, 'pub-dev-started.stamp'));
      if (stampFile.existsSync()) {
        stderr.writeln('[warning-service-restarted]: '
            '${stampFile.path} already exists, indicating that this process has been restarted.');
      } else {
        stampFile.createSync(recursive: true);
      }
    }
    _setupServiceIsolate();

    int frontendStarted = 0;

    /// The duration while errors won't cause frontend isolates to restart.
    var restartProtectionOffset = Duration.zero;
    var lastStarted = clock.now();
    int workerStarted = 0;
    final statConsumerPorts = <SendPort>[];

    Future<void> startFrontendIsolate() async {
      frontendStarted++;
      final frontendIndex = frontendStarted;
      logger.info('About to start frontend isolate #$frontendIndex...');
      final errorReceivePort = ReceivePort();
      final exitReceivePort = ReceivePort();
      final protocolReceivePort = ReceivePort();
      await Isolate.spawn(
        _wrapper,
        [
          frontendEntryPoint,
          FrontendEntryMessage(
            frontendIndex: frontendIndex,
            protocolSendPort: protocolReceivePort.sendPort,
          ),
        ],
        onError: errorReceivePort.sendPort,
        onExit: exitReceivePort.sendPort,
        errorsAreFatal: true,
      );
      final protocolMessage = (await protocolReceivePort.take(1).toList())
          .single as FrontendProtocolMessage;
      if (protocolMessage.statsConsumerPort != null) {
        statConsumerPorts.add(protocolMessage.statsConsumerPort!);
      }
      logger.info('Frontend isolate #$frontendIndex started.');
      lastStarted = clock.now();

      StreamSubscription? errorSubscription;
      StreamSubscription? exitSubscription;

      Future<void> close() async {
        if (protocolMessage.statsConsumerPort != null) {
          statConsumerPorts.remove(protocolMessage.statsConsumerPort);
        }
        await errorSubscription?.cancel();
        await exitSubscription?.cancel();
        errorReceivePort.close();
        exitReceivePort.close();
        protocolReceivePort.close();
      }

      Future<void> restart() async {
        await close();
        // Restart the isolate after a pause, increasing the pause duration at
        // each restart.
        //
        // NOTE: As this wait period increases, the service may miss /liveness_check
        //       requests, and eventually AppEngine may just kill the instance
        //       marking it unreachable.
        await Future.delayed(Duration(seconds: 5 + frontendStarted));
        await startFrontendIsolate();
      }

      errorSubscription = errorReceivePort.listen((e) async {
        stderr.writeln('ERROR from frontend isolate #$frontendIndex: $e');
        logger.severe('ERROR from frontend isolate #$frontendIndex', e);

        final now = clock.now();
        // If the last isolate was started more than an hour ago, we can reset
        // the protection.
        if (now.isAfter(lastStarted.add(Duration(hours: 1)))) {
          restartProtectionOffset = Duration.zero;
        }

        // If we have recently restarted an isolate, let's keep it running.
        if (now.isBefore(lastStarted.add(restartProtectionOffset))) {
          return;
        }

        // Extend restart protection for up to 20 minutes.
        if (restartProtectionOffset.inMinutes < 20) {
          restartProtectionOffset += Duration(minutes: 4);
        }

        await restart();
      });

      exitSubscription = exitReceivePort.listen((e) async {
        stderr.writeln(
            'Frontend isolate #$frontendIndex exited with message: $e');
        logger.warning('Frontend isolate #$frontendIndex exited.', e);
        await restart();
      });
    }

    Future<void> startWorkerIsolate() async {
      workerStarted++;
      final workerIndex = workerStarted;
      logger.info('About to start worker isolate #$workerIndex...');
      final errorReceivePort = ReceivePort();
      final protocolReceivePort = ReceivePort();
      final statsReceivePort = ReceivePort();
      final aliveReceivePort = ReceivePort();
      final isolate = await Isolate.spawn(
        _wrapper,
        [
          workerEntryPoint,
          WorkerEntryMessage(
            workerIndex: workerIndex,
            protocolSendPort: protocolReceivePort.sendPort,
            statsSendPort: statsReceivePort.sendPort,
            aliveSendPort: aliveReceivePort.sendPort,
          ),
        ],
        onError: errorReceivePort.sendPort,
        onExit: errorReceivePort.sendPort,
        errorsAreFatal: true,
      );
      // read WorkerProtocolMessage
      (await protocolReceivePort.take(1).toList()).single;
      final statsSubscription =
          statsReceivePort.cast<Map>().listen((Map stats) {
        updateLatestStats(stats);
        for (SendPort sp in statConsumerPorts) {
          sp.send(stats);
        }
      });
      logger.info('Worker isolate #$workerIndex started.');

      Timer? autoKillTimer;
      void resetAutoKillTimer() {
        if (deadWorkerTimeout == null) return;
        autoKillTimer?.cancel();

        /// Randomize TTL so that isolate restarts do not happen at the same time.
        final ttl = deadWorkerTimeout +
            Duration(seconds: _random.nextInt(deadWorkerTimeout.inSeconds));
        autoKillTimer = Timer(ttl, () {
          logger.info('Killing worker isolate #$workerIndex...');
          isolate.kill();
        });
      }

      // We DO NOT initialize [autoKillTimer] at this point, allowing the worker
      // to do arbitrary-length setup. Once the first message comes in, we can
      // start the auto-kill timer.
      final aliveSubscription = aliveReceivePort.listen((_) {
        resetAutoKillTimer();
      });

      StreamSubscription? errorSubscription;

      Future<void> close() async {
        await aliveSubscription.cancel();
        autoKillTimer?.cancel();
        await statsSubscription.cancel();
        await errorSubscription?.cancel();
        errorReceivePort.close();
        protocolReceivePort.close();
        statsReceivePort.close();
      }

      errorSubscription = errorReceivePort.listen((e) async {
        stderr.writeln('ERROR from worker isolate #$workerIndex: $e');
        logger.severe('ERROR from worker isolate #$workerIndex', e);
        await close();
        // restart isolate after a brief pause
        await Future.delayed(Duration(minutes: 1));
        await startWorkerIsolate();
      });
    }

    try {
      await withServices(() async {
        if (frontendEntryPoint != null) {
          for (int i = 0; i < envConfig.frontendCount; i++) {
            await startFrontendIsolate();
          }
        }
        if (workerEntryPoint != null) {
          for (int i = 0; i < envConfig.workerCount; i++) {
            await startWorkerIsolate();
          }
        }
      });
    } catch (e, st) {
      logger.shout('Failed to start server.', e, st);
      rethrow;
    }
  });
}