Redis进阶十二之Lettuce使用技巧详解

前言

Lettuce 是 Redis 的一款高级 Java 客户端,与 Jedis 并列成为最热门的客户端之一,目前已成为 SpringBoot 2.0 版本默认的 Redis 客户端。

相比老牌 Jedis,Lettuce 属于后起之秀,不仅功能丰富,而且提供了很多新的功能特性,比如异步操作、响应式编程等等,同时还解决了 Jedis 中线程不安全的问题。

Lettuce是一个可扩展的线程安全Redis客户端,用于同步、异步和反应式使用。如果多个线程避免阻塞和事务性操作(如BLPOP和MULTI/EXEC),则它们可以共享一个连接。支持高级Redis功能,如Sentinel、Cluster、Pipelining、Auto Reconnect和Redis数据模型。

引入依赖

要使用Lettuce ,需要引入Lettuce的maven 依赖

<dependency>
	<groupId>io.lettuce</groupId>
	<artifactId>lettuce-core</artifactId>
	<version>5.3.1.RELEASE</version>
</dependency>

入门案例

public class LettuceTest {

    public static void main(String[] args) {
        //创建RedisClient客户端
        RedisClient client = createRedisClient();
        //打开Redis独立连接。端点是从初始化的RedisClient使用的
        StatefulRedisConnection<String, String> connection = client.connect();
        //获取用于同步执行的命令API。Lettuce也支持异步和反应式执行模型。
        RedisCommands<String, String> commands = connection.sync();
        //发出GET命令以获取name。
        String value = commands.get("name");

        System.out.println(value);
    }

    /**
     * 创建RedisClient客户端
     */
    private static RedisClient createRedisClient() {
        //创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
        RedisURI redisURI = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .build();
        return RedisClient.create(redisURI);
    }
}

RedisURI

RedisURI包含主机/端口,可以携带身份验证/数据库详细信息。成功连接后,您将获得身份验证,然后选择数据库。

Redis URI也可以通过URI字符串创建。支持的格式有:

redis://[password@]host[:port][/databaseNumber] Plaintext Redis connection

rediss://[password@]host[:port][/databaseNumber] SSL Redis connection

redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber]#sentinelMasterId for using Redis Sentinel

redis-socket:///path/to/socket Unix Domain Socket connection to Redis

Asynchronous API

除此之外,Lettuce 还支持异步操作,将上面的操作改成异步处理,结果如下!

import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class LettuceTest {

    public static void main(String[] args) throws Exception {
        //创建RedisClient客户端
        RedisClient client = createRedisClient();
        //打开Redis独立连接。端点是从初始化的RedisClient使用的
        StatefulRedisConnection<String, String> connection = client.connect();
        //获取用于异步执行的命令API。Lettuce也支持异步和反应式执行模型。
        RedisAsyncCommands<String, String> async = connection.async();
        RedisFuture<String> set = async.set("name", "yangyanping");
        RedisFuture<String> get = async.get("name");

        if (LettuceFutures.awaitAll(Duration.of(2, ChronoUnit.SECONDS), set, get)) {
            System.out.println(set.get());
            System.out.println(get.get());
        }
    }

    /**
     * 创建RedisClient客户端
     */
    private static RedisClient createRedisClient() {
        //创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
        RedisURI redisURI = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();
        return RedisClient.create(redisURI);
    }
}

响应式编程

Lettuce 除了支持异步编程以外,还支持响应式编程。

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class LettuceTest {

    public static void main(String[] args) throws Exception {
        //创建RedisClient客户端
        RedisClient client = createRedisClient();
        StatefulRedisConnection<String,String> connection = client.connect();
        RedisReactiveCommands<String,String> reactive =  connection.reactive();
        Mono<String> set = reactive.set("name","yangyanping");
        Mono<String> get = reactive.get("name");
        set.subscribe();
        System.out.println(get.block());
    }

    /**
     * 创建RedisClient客户端
     */
    private static RedisClient createRedisClient() {
        //创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
        RedisURI redisURI = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();
        return RedisClient.create(redisURI);
    }
}

发布和订阅

Redis 发布订阅(Pub/Sub)是一种消息通信模式:发送者通过 PUBLISH发布消息,订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。

主要包含三个部分组成:「发布者」、「订阅者」、「Channel」。

发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。

如下图所示,三个「订阅者」订阅「Channel」频道:

在Redis中发布消息

127.0.0.1:6379> PUBLISH channel name
(integer) 1
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class LettuceTest {

    public static void main(String[] args) throws Exception {
        //创建RedisClient客户端
        RedisClient client = createRedisClient();
        RedisPubSubCommands<String,String> commands = client.connectPubSub().sync();
        commands.getStatefulConnection().addListener(new RedisPubSubListener<String, String>() {
            @Override
            public void message(String channel, String message) {
                System.out.println("[message]" + channel + " -> " + message);
            }

            @Override
            public void message(String pattern, String channel, String message) {
                System.out.println("[message]" + channel + " -> " + message);
            }

            @Override
            public void subscribed(String s, long l) {

            }

            @Override
            public void psubscribed(String s, long l) {

            }

            @Override
            public void unsubscribed(String s, long l) {

            }

            @Override
            public void punsubscribed(String s, long l) {

            }
        });

        commands.subscribe("channel");

        System.in.read();
    }

    /**
     * 创建RedisClient客户端
     */
    private static RedisClient createRedisClient() {
        //创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
        RedisURI redisURI = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();
        return RedisClient.create(redisURI);
    }
}

输出结果

[message]channel -> name

线程池配置

Lettuce 连接设计的时候,就是线程安全的,所以一个连接可以被多个线程共享,同时 lettuce 连接默认是自动重连的,使用单连接基本可以满足业务需求,大多数情况下不需要配置连接池,多连接并不会给操作带来性能上的提升。

但在某些特殊场景下,比如事务操作,使用连接池会是一个比较好的方案,那么如何配置线程池呢?

首先引入依赖

<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
	<version>2.11.1</version>
</dependency>
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class LettuceTest {

    public static void main(String[] args) throws Exception {
       RedisClient client = createRedisClient();
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMinIdle(50);

        GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
        StatefulRedisConnection<String, String> connection = pool.borrowObject();
        RedisCommands<String, String> commands = connection.sync();
        commands.set("name", "yyp");
        System.out.println(commands.get("name"));
    }

    /**
     * 创建RedisClient客户端
     */
    private static RedisClient createRedisClient() {
        //创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
        RedisURI redisURI = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();
        return RedisClient.create(redisURI);
    }
}

客户端资源与参数配置

Lettuce 客户端的通信框架集成了 Netty 的非阻塞 IO 操作,客户端资源的设置与 Lettuce 的性能、并发和事件处理紧密相关,如果不是特别熟悉客户端参数配置,不建议在没有经验的前提下凭直觉修改默认值,保持默认配置就行。

非集群环境下,具体的配置案例如下:

public static void main(String[] args) throws Exception {
        RedisURI redisURI = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();
        ClientResources resources = DefaultClientResources.builder()
                .ioThreadPoolSize(4) //I/O线程数
                .computationThreadPoolSize(4) //任务线程数
                .build();

        ClusterClientOptions options = ClusterClientOptions.builder()
                .autoReconnect(true)//是否自动重连
                .pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
                .validateClusterNodeMembership(true)//是否校验集群节点的成员关系
                .build();
        RedisClusterClient client = RedisClusterClient.create(resources, redisURI);
        client.setOptions(options);
        StatefulRedisClusterConnection<String, String> connection = client.connect();
        RedisAdvancedClusterCommands<String, String> commands = connection.sync();
        commands.set("name", "yyp");
        System.out.println(commands.get("name"));
    }

Cluster 集群模式配置

Cluster 集群模式,是之后推出的一种高可用的架构模型,主要是采用分片方式来存储数据,具体配置如下:

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;

public class LettuceTest {

    public static void main(String[] args) throws Exception {
        RedisURI redisURI1 = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6379)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();

        RedisURI redisURI2 = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6389)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();

        RedisURI redisURI3 = RedisURI
                .builder()
                .withHost("127.0.0.1")
                .withPort(6399)
                .withTimeout(Duration.of(10, ChronoUnit.SECONDS))
                .build();


        RedisClusterClient client = RedisClusterClient.create(Arrays.asList(redisURI1, redisURI2, redisURI3));

        StatefulRedisClusterConnection<String, String> connection = client.connect();
        RedisAdvancedClusterCommands<String, String> commands = connection.sync();
        commands.set("name", "yyp");
        System.out.println(commands.get("name"));
    }
}
原文链接:,转发请注明来源!