in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyClient.java [231:295]
private static SslContext getSslContext(NettyRequestReplySpec spec) {
final Optional<String> maybeTrustCaCerts = spec.getTrustedCaCerts();
final Optional<String> maybeClientCerts = spec.getClientCerts();
final Optional<String> maybeClientKey = spec.getClientKey();
final Optional<String> maybeKeyPassword = spec.getClientKeyPassword();
boolean onlyOneOfEitherCertOrKeyPresent =
maybeClientCerts.isPresent() ^ maybeClientKey.isPresent();
if (onlyOneOfEitherCertOrKeyPresent) {
throw new IllegalStateException(
"You need to provide both the cert and they key if you want to use mutual TLS.");
}
final Optional<InputStream> maybeTrustCaCertsInputStream =
maybeTrustCaCerts.map(
trustedCaCertsLocation ->
openStreamIfExistsOrThrow(
ResourceLocator.findNamedResource(trustedCaCertsLocation)));
final Optional<InputStream> maybeCertInputStream =
maybeClientCerts.map(
clientCertLocation ->
openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientCertLocation)));
final Optional<InputStream> maybeKeyInputStream =
maybeClientKey.map(
clientKeyLocation ->
openStreamIfExistsOrThrow(ResourceLocator.findNamedResource(clientKeyLocation)));
final SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
maybeTrustCaCertsInputStream.ifPresent(sslContextBuilder::trustManager);
maybeCertInputStream.ifPresent(
certInputStream -> {
final InputStream keyInputStream =
maybeKeyInputStream.orElseThrow(
() -> new IllegalStateException("The key is required"));
if (maybeKeyPassword.isPresent()) {
try {
final String keyPasswordString =
IOUtils.toString(
ResourceLocator.findNamedResource(maybeKeyPassword.get()),
StandardCharsets.UTF_8);
sslContextBuilder.keyManager(certInputStream, keyInputStream, keyPasswordString);
} catch (IOException e) {
throw new IllegalStateException(
String.format(
"Could not read the key password from the file %s. Examples of the correct usage: 'classpath:file.txt' or '/tmp/pass', etc.",
maybeKeyPassword.get()),
e);
}
} else {
sslContextBuilder.keyManager(certInputStream, keyInputStream);
}
});
try {
return sslContextBuilder.build();
} catch (IOException e) {
throw new IllegalStateException("Could not build the ssl context.", e);
} finally {
maybeTrustCaCertsInputStream.ifPresent(NettyClient::closeWithBestEffort);
maybeCertInputStream.ifPresent(NettyClient::closeWithBestEffort);
maybeKeyInputStream.ifPresent(NettyClient::closeWithBestEffort);
}
}