private static SslContext getSslContext()

in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java [231:295]


  private static SslContext getSslContext(NettyRequestReplySpec spec) {
    final Optional<String> maybeTrustCaCerts = spec.getTrustedCaCerts();
    final Optional<String> maybeClientCerts = spec.getClientCerts();
    final Optional<String> maybeClientKey = spec.getClientKey();
    final Optional<String> maybeKeyPassword = spec.getClientKeyPassword();

    boolean onlyOneOfEitherCertOrKeyPresent =
        maybeClientCerts.isPresent() ^ maybeClientKey.isPresent();
    if (onlyOneOfEitherCertOrKeyPresent) {
      throw new IllegalStateException(
          "You need to provide both the cert and they key if you want to use mutual TLS.");
    }

    final Optional<InputStream> maybeTrustCaCertsInputStream =
        maybeTrustCaCerts.map(
            trustedCaCertsLocation ->
                openStreamIfExistsOrThrow(
                    ResourceLocator.findNamedResource(trustedCaCertsLocation)));

    final Optional<InputStream> maybeCertInputStream =
        maybeClientCerts.map(
            clientCertLocation ->
                openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation)));

    final Optional<InputStream> maybeKeyInputStream =
        maybeClientKey.map(
            clientKeyLocation ->
                openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation)));

    final SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
    maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager);
    maybeCertInputStream.ifPresent(
        certInputStream -> {
          final InputStream keyInputStream =
              maybeKeyInputStream.orElseThrow(
                  () -> new IllegalStateException("The key is required"));
          if (maybeKeyPassword.isPresent()) {
            try {
              final String keyPasswordString =
                  IOUtils.toString(
                      ResourceLocator.findNamedResource(maybeKeyPassword.get()),
                      StandardCharsets.UTF_8);
              sslContextBuilder.keyManager(certInputStream, keyInputStream, keyPasswordString);
            } catch (IOException e) {
              throw new IllegalStateException(
                  String.format(
                      "Could not read the key password from the file %s. Examples of the correct usage: 'classpath:file.txt' or '/tmp/pass', etc.",
                      maybeKeyPassword.get()),
                  e);
            }
          } else {
            sslContextBuilder.keyManager(certInputStream, keyInputStream);
          }
        });

    try {
      return sslContextBuilder.build();
    } catch (IOException e) {
      throw new IllegalStateException("Could not build the ssl context.", e);
    } finally {
      maybeTrustCaCertsInputStream.ifPresent(NettyClient::closeWithBestEffort);
      maybeCertInputStream.ifPresent(NettyClient::closeWithBestEffort);
      maybeKeyInputStream.ifPresent(NettyClient::closeWithBestEffort);
    }
  }