Lettuce简介 - Java Redis客户端


1.概述

本文介绍了Redius Java客户端Lettuce。

Redis是一个内存中的键值存储,可用作数据库,缓存或消息代理。使用对Redis内存数据结构中的键进行操作的命令添加,查询,修改和删除数据。

Lettuce支持完整Redis API的同步和异步通信使用,包括其数据结构,发布/订阅消息传递和高可用性服务器连接。

2.Why Lettuce?

我们在以前的一篇文章中介绍了Jedis 。是什么让生菜与众不同?

最重要的区别是它通过Java 8的CompletionStage接口提供异步支持,并支持Reactive Streams。正如我们将在下面看到的,Lettuce提供了一个自然界面,用于从Redis数据库服务器发出异步请求以及创建流。

它还使用Netty与服务器通信。这使得“更重”的API,但也使它更适合与多个线程共享连接。

3.设置

3.1。依赖 让我们首先声明pom.xml中我们需要的唯一依赖项:

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.0.1.RELEASE</version>
</dependency>

3.2。Redis安装

我们需要安装并运行至少一个Redis实例,如果我们希望测试群集或前哨模式,则需要两个(尽管Sentinel模式需要三台服务器才能正常运行。)对于本文,我们使用的是4.0.x - 最新的稳定版本。

有关Redis入门的更多信息,请参见此处,包括Linux和MacOS的下载。

Redis的不正式支持Windows,但有服务器的端口这里。我们还可以在Docker中运行Redis,这是Windows 10的更好替代方案,也是快速启动和运行的方法。

4.连接

4.1。连接到服务器

连接到Redis包括四个步骤:

  1. 创建Redis URI
  2. 使用URI连接到RedisClient
  3. 打开Redis连接
  4. 生成一组RedisCommands

让我们看看实现:

edisClient redisClient = RedisClient
  .create("redis://password@localhost:6379/");
StatefulRedisConnection<Stri

一个StatefulRedisConnection是这听起来像; 与Redis服务器的线程安全连接,它将维护与服务器的连接并在需要时重新连接。一旦我们建立了连接,我们就可以使用它来同步或异步执行Redis命令。

RedisClient使用大量系统资源,因为它拥有用于与Redis服务器通信的Netty资源。需要多个连接的应用程序应使用单个RedisClient。

4.2。Redis URI

我们通过将URI传递给静态工厂方法来创建RedisClient。

Lettuce利用Redis URI的自定义语法。这是架构:

redis :// [password@] host [: port] [/ database]
  [? [timeout=timeout[d|h|m|s|ms|us|ns]]
  [&_database=database_]]

有四种URI方案:

  • redis - 一个独立的Redis服务器
  • rediss - 通过SSL连接的独立Redis服务器
  • redis-socket - 通过Unix域套接字的独立Redis服务器
  • redis-sentinel - Redis Sentinel服务器

可以将Redis数据库实例指定为URL路径的一部分或作为附加参数。如果两者都提供,则参数具有更高的优先级。

在上面的示例中,我们使用String表示。Lettuce还有一个用于建立连接的RedisURI类。它提供了Builder模式:

RedisURI.Builder
  .redis("localhost", 6379).auth("password")
  .database(1).build();

还有一个构造函数:

new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

4.3。同步命令

与Jedis类似,Lettuce以方法的形式提供完整的Redis命令集。

但是,Lettuce实现了同步和异步版本。我们将简要介绍同步版本,然后在本教程的其余部分使用异步实现。

创建连接后,我们使用它来创建一个命令集:

RedisCommands<String, String> syncCommands = connection.sync();

现在我们有了一个与Redis通信的直观界面。 我们可以设置并获取String值:

syncCommands.set("key", "Hello, Redis!");

String value = syncommands.get(key);

我们可以使用hashes:

syncCommands.hset("recordName", "FirstName", "John");
syncCommands.hset("recordName", "LastName", "Smith");
Map<String, String> record = syncCommands.hgetall("recordName");

我们将在本文后面介绍更多Redis。

Lettuce synchronous API使用异步API。在命令级别为我们完成阻止。这意味着多个客户端可以共享同步连接。

4.4。异步命令

我们来看看异步命令:

RedisAsyncCommands<String, String> asyncCommands = connection.async();

我们从连接中检索一组RedisAsyncCommands,类似于我们检索同步集的方式。这些命令返回RedisFuture(内部是CompletableFuture):

RedisFuture<String> result = asyncCommands.get("key");

可以在此处找到使用CompletableFuture的指南。

4.5。反应性API

最后,让我们看看如何使用非阻塞的反应API:

RedisStringReactiveCommands<String, String> reactiveCommands = connection.reactive();

这些命令从Project Reactor返回包含在Mono或Flux中的结果。

可以在此处找到使用Project Reactor的指南。

5. Redis数据结构

我们简要地查看了上面的字符串和哈希值,让我们看看Lettuce如何实现Redis的其余数据结构。正如我们所期望的那样,每个Redis命令都有一个类似命名的方法。

5.1。清单

列表是保留插入顺序的字符串列表。从任一端插入或检索值:

asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
RedisFuture<String> redisFuture = asyncCommands.rpop("tasks");

String nextTask = redisFuture.get();

在此示例中,nextTask等于“ firstTask ”。Lpush将值推送到列表的头部,然后rpop从列表的末尾弹出值。

我们也可以从另一端弹出元素:

asyncCommands.del("tasks");
asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
redisFuture = asyncCommands.lpop("tasks");

String nextTask = redisFuture.get();

我们通过删除带del的列表来启动第二个示例。然后我们再次插入相同的值,但是我们使用lpop来弹出列表头部的值,因此nextTask包含“ secondTask ”文本。

5.2。集

Redis集是与Java 集类似的无序字符串集合 ; 没有重复的元素:

asyncCommands.sadd("pets", "dog");
asyncCommands.sadd("pets", "cat");
asyncCommands.sadd("pets", "cat");

RedisFuture<Set<String>> pets = asyncCommands.smembers("nicknames");
RedisFuture<Boolean> exists = asyncCommands.sismember("pets", "dog");

当我们将Redis设置为Set时,大小为2,因为忽略了重复的“cat”。当我们用sismember查询Redis是否存在“dog”时,响应是真的。

5.3。Hashes

我们先简要介绍了一个哈希的例子。他们值得快速解释。

Redis Hashes是具有String字段和值的记录。每条记录在主索引中也有一个键:

asyncCommands.hset("recordName", "FirstName", "John");
asyncCommands.hset("recordName", "LastName", "Smith");

RedisFuture<String> lastName
  = syncCommands.hget("recordName", "LastName");
RedisFuture<Map<String, String>> record

我们使用hset向哈希添加字段,传入哈希名称,字段名称和值。

然后,我们使用hget,记录名称和字段检索单个值。最后,我们使用hgetall将整个记录作为哈希获取。

5.4。排序集

排序集包含值和排序,按顺序排序。等级是64位浮点值。

项目添加了排名,并在一个范围内检索:

asyncCommands.zadd("sortedset", 1, "one");
asyncCommands.zadd("sortedset", 4, "zero");
asyncCommands.zadd("sortedset", 2, "two");

RedisFuture<List<String>> valuesForward = asyncCommands.zrange(key, 0, 3);
RedisFuture<List<String>> valuesReverse = asyncCommands.zrevrange(key, 0, 3);

zadd的第二个参数是排名。我们按排名检索范围,zrange为升序,zrevrange为降序。

我们添加了等级为4的“ 零 ”,因此它将出现在valuesForward的末尾和valuesReverse的开头。

6.Transactions

事务允许在单个原子步骤中执行一组命令。保证这些命令按顺序执行。在事务完成之前,不会执行来自其他用户的命令。

要么执行所有命令,要么都不执行。如果其中一个失败,Redis将不执行回滚。一旦执行exec()被调用时,所有命令都以指定的顺序执行。

我们来看一个例子:

asyncCommands.multi();

RedisFuture<String> result1 = asyncCommands.set("key1", "value1");
RedisFuture<String> result2 = asyncCommands.set("key2", "value2");
RedisFuture<String> result3 = asyncCommands.set("key3", "value3");

RedisFuture<TransactionResult> execResult = asyncCommands.exec();

TransactionResult transactionResult = execResult.get();

String firstResult = transactionResult.get(0);
String secondResult = transactionResult.get(0);
String thirdResult = transactionResult.get(0);

对multi的调用启动了事务。启动事务时,在调用exec()之前不会执行后续命令。

在同步模式下,命令返回null。在异步模式下,命令返回RedisFuture。Exec返回一个TransactionResult,其中包含一个响应列表。

由于RedisFutures也会收到结果,因此异步API客户端会在两个地方收到事务结果。

7.Batching

在正常情况下,Lettuce会在API客户端调用命令后立即执行命令。

这是大多数普通应用程序所需要的,特别是如果它们依赖于串行接收命令结果。

但是,如果应用程序不立即需要结果或批量上载大量数据,则此行为效率不高。

异步应用程序可以覆盖此行为:

commands.setAutoFlushCommands(false);

List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
    futures.add(commands.set("key-" + i, "value-" + i);
}
commands.flushCommands();

boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
  futures.toArray(new RedisFuture[0]));

将setAutoFlushCommands设置为false时,应用程序必须手动调用flushCommands。在此示例中,我们将多个set命令排队,然后刷新通道。AwaitAll等待所有RedisFutures完成。

此状态基于每个连接设置,并影响使用该连接的所有线程。此功能不适用于同步命令。

8.发布/订阅

Redis提供简单的发布/订阅消息传递系统。订阅者使用subscribe命令使用来自通道的消息。邮件不会保留; 它们仅在订阅频道时发送给用户。

Redis使用pub / sub系统获取有关Redis数据集的通知,使客户端能够接收有关设置,删除,过期等密钥的事件。

有关详细信息,请参阅此处的文档。

8.1。Subscriber

甲RedisPubSubListener接收发布/订阅消息。这个接口定义了几种方法,但我们只是在这里展示接收消息的方法:

public class Listener implements RedisPubSubListener<String, String> {

    @Override
    public void message(String channel, String message) {
        log.debug("Got {} on channel {}",  message, channel);
        message = new String(s2);
    }
}

我们使用RedisClient连接pub / sub通道并安装监听器:

StatefulRedisPubSubConnection<String, String> connection
 = client.connectPubSub();
connection.addListener(new Listener())

RedisPubSubAsyncCommands<String, String> async
 = connection.async();
async.subscribe("channel");

安装了侦听器后,我们检索一组RedisPubSubAsyncCommands并订阅一个频道。

8.2。出版者

发布只需连接Pub / Sub通道并检索命令:

发布需要频道和消息。

8.3。无效订阅

Lettuce还提供用于订阅发布/订阅消息的响应接口:

StatefulRedisPubSubConnection<String, String> connection = client
  .connectPubSub();

RedisPubSubAsyncCommands<String, String> reactive = connection
  .reactive();

reactive.observeChannels().subscribe(message -> {
    log.debug("Got {} on channel {}",  message, channel);
    message = new String(s2);
});
reactive.subscribe("channel").subscribe();

的助焊剂通过返回observeChannels接收消息的所有信道,但由于这是一个流,过滤是很容易做到。

9.高可用性

Redis提供了多种高可用性和可扩展性选项。完整的理解需要了解Redis服务器配置,但我们将简要概述一下Lettuce如何支持它们。

9.1。主从

Redis服务器在主/从配置中复制自身。主服务器向从属设备发送一个命令流,用于将主高速缓存复制到从属设备。Redis不支持双向复制,因此从站是只读的。

生菜可以连接到主/从系统,查询它们的拓扑,然后选择从属设备进行读取操作,这可以提高吞吐量:

RedisClient redisClient = RedisClient.create();

StatefulRedisMasterSlaveConnection<String, String> connection
 = MasterSlave.connect(redisClient,
   new Utf8StringCodec(), RedisURI.create("redis://localhost"));

connection.setReadFrom(ReadFrom.SLAVE);

9.2。Sentinel

Redis Sentinel监视主实例和从属实例,并在主故障转移时协调故障转移到从属服务器。

生菜可以连接到Sentinel,使用它来发现当前主服务器的地址,然后返回到它的连接。

要做到这一点,我们建立了一个不同的RedisURI并连接我们的RedisClient吧:

RedisURI redisUri = RedisURI.Builder
  .sentinel("sentinelhost1", "clustername")
  .withSentinel("sentinelhost2").build();
RedisClient client = new RedisClient(redisUri);

RedisConnection<String, String> connection = client.connect();

我们使用第一个Sentinel的主机名(或地址)和一个集群名称构建了URI,然后是第二个标记地址。当我们连接到Sentinel时,Lettuce会查询有关拓扑的信息,并为我们返回与当前主服务器的连接。

9.3。集群

Redis Cluster使用分布式配置来提供高可用性和高吞吐量。

在多达1000个节点上群集分片键,因此群集中的事务不可用:

RedisURI redisUri = RedisURI.Builder.redis("localhost")
  .withPassword("authentication").build();
RedisClusterClient clusterClient = RedisClusterClient
  .create(rediUri);
StatefulRedisClusterConnection<String, String> connection
 = clusterClient.connect();
RedisAdvancedClusterCommands<String, String> syncCommands = connection
  .sync();

RedisAdvancedClusterCommands保存集群支持的一组Redis命令,并将它们路由到保存密钥的实例。