in pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java [77:124]
public static RedisSession create(RedisSinkConfig config) {
RedisSession redisSession;
final RedisCodec<byte[], byte[]> codec = new ByteArrayCodec();
final SocketOptions socketOptions = SocketOptions.builder()
.tcpNoDelay(config.isTcpNoDelay())
.connectTimeout(Duration.ofMillis(config.getConnectTimeout()))
.keepAlive(config.isKeepAlive())
.build();
final ClientMode clientMode;
try {
clientMode = ClientMode.valueOf(config.getClientMode().toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal Redis client mode, valid values are: "
+ Arrays.asList(ClientMode.values()));
}
List<RedisURI> redisURIs = redisURIs(config.getHostAndPorts(), config);
if (clientMode == ClientMode.STANDALONE) {
ClientOptions.Builder clientOptions = ClientOptions.builder()
.socketOptions(socketOptions)
.requestQueueSize(config.getRequestQueue())
.autoReconnect(config.isAutoReconnect());
final RedisClient client = RedisClient.create(redisURIs.get(0));
client.setOptions(clientOptions.build());
final StatefulRedisConnection<byte[], byte[]> connection = client.connect(codec);
redisSession = new RedisSession(client, connection, connection.async());
} else if (clientMode == ClientMode.CLUSTER) {
ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder()
.requestQueueSize(config.getRequestQueue())
.autoReconnect(config.isAutoReconnect());
final RedisClusterClient client = RedisClusterClient.create(redisURIs);
client.setOptions(clientOptions.build());
final StatefulRedisClusterConnection<byte[], byte[]> connection = client.connect(codec);
redisSession = new RedisSession(client, connection, connection.async());
} else {
throw new UnsupportedOperationException(
String.format("%s is not supported", config.getClientMode())
);
}
return redisSession;
}