public static RedisSession create()

in pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisSession.java [77:124]


    public static RedisSession create(RedisSinkConfig config) {
        RedisSession redisSession;
        final RedisCodec<byte[], byte[]> codec = new ByteArrayCodec();

        final SocketOptions socketOptions = SocketOptions.builder()
            .tcpNoDelay(config.isTcpNoDelay())
            .connectTimeout(Duration.ofMillis(config.getConnectTimeout()))
            .keepAlive(config.isKeepAlive())
            .build();

        final ClientMode clientMode;
        try {
            clientMode = ClientMode.valueOf(config.getClientMode().toUpperCase());
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Illegal Redis client mode, valid values are: "
                + Arrays.asList(ClientMode.values()));
        }

        List<RedisURI> redisURIs = redisURIs(config.getHostAndPorts(), config);

        if (clientMode == ClientMode.STANDALONE) {
            ClientOptions.Builder clientOptions = ClientOptions.builder()
                .socketOptions(socketOptions)
                .requestQueueSize(config.getRequestQueue())
                .autoReconnect(config.isAutoReconnect());

            final RedisClient client = RedisClient.create(redisURIs.get(0));
            client.setOptions(clientOptions.build());
            final StatefulRedisConnection<byte[], byte[]> connection = client.connect(codec);
            redisSession = new RedisSession(client, connection, connection.async());
        } else if (clientMode == ClientMode.CLUSTER) {
            ClusterClientOptions.Builder clientOptions = ClusterClientOptions.builder()
                .requestQueueSize(config.getRequestQueue())
                .autoReconnect(config.isAutoReconnect());

            final RedisClusterClient client = RedisClusterClient.create(redisURIs);
            client.setOptions(clientOptions.build());

            final StatefulRedisClusterConnection<byte[], byte[]> connection = client.connect(codec);
            redisSession = new RedisSession(client, connection, connection.async());
        } else {
            throw new UnsupportedOperationException(
                String.format("%s is not supported", config.getClientMode())
            );
        }

        return redisSession;
    }