前言
Lettuce 是 Redis 的一款高级 Java 客户端,与 Jedis 并列成为最热门的客户端之一,目前已成为 SpringBoot 2.0 版本默认的 Redis 客户端。
相比老牌 Jedis,Lettuce 属于后起之秀,不仅功能丰富,而且提供了很多新的功能特性,比如异步操作、响应式编程等等,同时还解决了 Jedis 中线程不安全的问题。
Lettuce是一个可扩展的线程安全Redis客户端,用于同步、异步和反应式使用。如果多个线程避免阻塞和事务性操作(如BLPOP和MULTI/EXEC),则它们可以共享一个连接。支持高级Redis功能,如Sentinel、Cluster、Pipelining、Auto Reconnect和Redis数据模型。
引入依赖
要使用Lettuce ,需要引入Lettuce的maven 依赖
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.3.1.RELEASE</version>
</dependency>
入门案例
public class LettuceTest {
public static void main(String[] args) {
//创建RedisClient客户端
RedisClient client = createRedisClient();
//打开Redis独立连接。端点是从初始化的RedisClient使用的
StatefulRedisConnection<String, String> connection = client.connect();
//获取用于同步执行的命令API。Lettuce也支持异步和反应式执行模型。
RedisCommands<String, String> commands = connection.sync();
//发出GET命令以获取name。
String value = commands.get("name");
System.out.println(value);
}
/**
* 创建RedisClient客户端
*/
private static RedisClient createRedisClient() {
//创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
RedisURI redisURI = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
return RedisClient.create(redisURI);
}
}
RedisURI
RedisURI包含主机/端口,可以携带身份验证/数据库详细信息。成功连接后,您将获得身份验证,然后选择数据库。
Redis URI也可以通过URI字符串创建。支持的格式有:
redis://[password@]host[:port][/databaseNumber] Plaintext Redis connection
rediss://[password@]host[:port][/databaseNumber] SSL Redis connection
redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber]#sentinelMasterId for using Redis Sentinel
redis-socket:///path/to/socket Unix Domain Socket connection to Redis
Asynchronous API
除此之外,Lettuce 还支持异步操作,将上面的操作改成异步处理,结果如下!
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class LettuceTest {
public static void main(String[] args) throws Exception {
//创建RedisClient客户端
RedisClient client = createRedisClient();
//打开Redis独立连接。端点是从初始化的RedisClient使用的
StatefulRedisConnection<String, String> connection = client.connect();
//获取用于异步执行的命令API。Lettuce也支持异步和反应式执行模型。
RedisAsyncCommands<String, String> async = connection.async();
RedisFuture<String> set = async.set("name", "yangyanping");
RedisFuture<String> get = async.get("name");
if (LettuceFutures.awaitAll(Duration.of(2, ChronoUnit.SECONDS), set, get)) {
System.out.println(set.get());
System.out.println(get.get());
}
}
/**
* 创建RedisClient客户端
*/
private static RedisClient createRedisClient() {
//创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
RedisURI redisURI = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
return RedisClient.create(redisURI);
}
}
响应式编程
Lettuce 除了支持异步编程以外,还支持响应式编程。
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class LettuceTest {
public static void main(String[] args) throws Exception {
//创建RedisClient客户端
RedisClient client = createRedisClient();
StatefulRedisConnection<String,String> connection = client.connect();
RedisReactiveCommands<String,String> reactive = connection.reactive();
Mono<String> set = reactive.set("name","yangyanping");
Mono<String> get = reactive.get("name");
set.subscribe();
System.out.println(get.block());
}
/**
* 创建RedisClient客户端
*/
private static RedisClient createRedisClient() {
//创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
RedisURI redisURI = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
return RedisClient.create(redisURI);
}
}
发布和订阅
Redis 发布订阅(Pub/Sub)是一种消息通信模式:发送者通过 PUBLISH发布消息,订阅者通过 SUBSCRIBE 订阅接收消息或通过UNSUBSCRIBE 取消订阅。
主要包含三个部分组成:「发布者」、「订阅者」、「Channel」。
发布者和订阅者属于客户端,Channel 是 Redis 服务端,发布者将消息发布到频道,订阅这个频道的订阅者则收到消息。
如下图所示,三个「订阅者」订阅「Channel」频道:
在Redis中发布消息
127.0.0.1:6379> PUBLISH channel name
(integer) 1
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class LettuceTest {
public static void main(String[] args) throws Exception {
//创建RedisClient客户端
RedisClient client = createRedisClient();
RedisPubSubCommands<String,String> commands = client.connectPubSub().sync();
commands.getStatefulConnection().addListener(new RedisPubSubListener<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println("[message]" + channel + " -> " + message);
}
@Override
public void message(String pattern, String channel, String message) {
System.out.println("[message]" + channel + " -> " + message);
}
@Override
public void subscribed(String s, long l) {
}
@Override
public void psubscribed(String s, long l) {
}
@Override
public void unsubscribed(String s, long l) {
}
@Override
public void punsubscribed(String s, long l) {
}
});
commands.subscribe("channel");
System.in.read();
}
/**
* 创建RedisClient客户端
*/
private static RedisClient createRedisClient() {
//创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
RedisURI redisURI = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
return RedisClient.create(redisURI);
}
}
输出结果
[message]channel -> name
线程池配置
Lettuce 连接设计的时候,就是线程安全的,所以一个连接可以被多个线程共享,同时 lettuce 连接默认是自动重连的,使用单连接基本可以满足业务需求,大多数情况下不需要配置连接池,多连接并不会给操作带来性能上的提升。
但在某些特殊场景下,比如事务操作,使用连接池会是一个比较好的方案,那么如何配置线程池呢?
首先引入依赖
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.support.ConnectionPoolSupport;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
public class LettuceTest {
public static void main(String[] args) throws Exception {
RedisClient client = createRedisClient();
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMinIdle(50);
GenericObjectPool<StatefulRedisConnection<String, String>> pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
StatefulRedisConnection<String, String> connection = pool.borrowObject();
RedisCommands<String, String> commands = connection.sync();
commands.set("name", "yyp");
System.out.println(commands.get("name"));
}
/**
* 创建RedisClient客户端
*/
private static RedisClient createRedisClient() {
//创建RedisClient实例并提供指向localhost端口6379(默认端口)的Redis URI。
RedisURI redisURI = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
return RedisClient.create(redisURI);
}
}
客户端资源与参数配置
Lettuce 客户端的通信框架集成了 Netty 的非阻塞 IO 操作,客户端资源的设置与 Lettuce 的性能、并发和事件处理紧密相关,如果不是特别熟悉客户端参数配置,不建议在没有经验的前提下凭直觉修改默认值,保持默认配置就行。
非集群环境下,具体的配置案例如下:
public static void main(String[] args) throws Exception {
RedisURI redisURI = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
ClientResources resources = DefaultClientResources.builder()
.ioThreadPoolSize(4) //I/O线程数
.computationThreadPoolSize(4) //任务线程数
.build();
ClusterClientOptions options = ClusterClientOptions.builder()
.autoReconnect(true)//是否自动重连
.pingBeforeActivateConnection(true)//连接激活之前是否执行PING命令
.validateClusterNodeMembership(true)//是否校验集群节点的成员关系
.build();
RedisClusterClient client = RedisClusterClient.create(resources, redisURI);
client.setOptions(options);
StatefulRedisClusterConnection<String, String> connection = client.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.set("name", "yyp");
System.out.println(commands.get("name"));
}
Cluster 集群模式配置
Cluster 集群模式,是之后推出的一种高可用的架构模型,主要是采用分片方式来存储数据,具体配置如下:
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
public class LettuceTest {
public static void main(String[] args) throws Exception {
RedisURI redisURI1 = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6379)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisURI redisURI2 = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6389)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisURI redisURI3 = RedisURI
.builder()
.withHost("127.0.0.1")
.withPort(6399)
.withTimeout(Duration.of(10, ChronoUnit.SECONDS))
.build();
RedisClusterClient client = RedisClusterClient.create(Arrays.asList(redisURI1, redisURI2, redisURI3));
StatefulRedisClusterConnection<String, String> connection = client.connect();
RedisAdvancedClusterCommands<String, String> commands = connection.sync();
commands.set("name", "yyp");
System.out.println(commands.get("name"));
}
}