本指南将引导您完成创建一个功能性的响应式应用程序的过程,该应用程序使用无阻塞Lettuce驱动程序,使用Spring Data与Redis交互。
您将构建一个Spring应用程序,该应用程序使用Spring Data Redis和Reactor项目与Redis进行交互,在不阻塞的情况下存储和检索Coffee对象。此应用程序使用基于响应流规范的Reactor发布服务器实现,即Mono(对于返回0或1值的发布服务器)和 Flux (对于返回0到n值的发布服务器)。
目录结构
└── src
└── main
└── java
└── hello
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework</groupId>
<artifactId>reactive-redis</artifactId>
<version>0.1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
创建一个域类
创建一个代表咖啡的类Coffee
src/main/java/hello/Coffee.java
package hello;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Coffee {
private String id;
private String name;
}
在这个例子中,我使用Lombok来消除构造函数和所谓的“数据类”方法的样板代码(accessors/mutators, equals(), toString(), 和 hashCode())。
创建支持响应式Redis操作的SpringBeans配置类
src/main/java/hello/CoffeeConfiguration.java
package hello;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class CoffeeConfiguration {
@Bean
ReactiveRedisOperations<String, Coffee> redisOperations(ReactiveRedisConnectionFactory factory) {
Jackson2JsonRedisSerializer<Coffee> serializer = new Jackson2JsonRedisSerializer<>(Coffee.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Coffee> builder =
RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext<String, Coffee> context = builder.value(serializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
}
创建一个Spring Bean,以便在启动应用程序时将一些示例数据加载到应用程序中。
因为我们可以多次重新启动应用程序,所以我们应该首先从以前的执行中删除可能仍然存在的任何数据。我们使用flushAll() (Redis)服务器命令来实现这一点。一旦我们刷新了任何现有的数据,我们就创建了一个小Flux,将每个咖啡名称映射到一个Coffee对象,并将其保存到反应式Redis存储库中。然后我们查询repo中的所有值并显示它们。
src/main/java/hello/CoffeeLoader.java
package hello;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import java.util.UUID;
@Component
public class CoffeeLoader {
private final ReactiveRedisConnectionFactory factory;
private final ReactiveRedisOperations<String, Coffee> coffeeOps;
public CoffeeLoader(ReactiveRedisConnectionFactory factory, ReactiveRedisOperations<String, Coffee> coffeeOps) {
this.factory = factory;
this.coffeeOps = coffeeOps;
}
@PostConstruct
public void loadData() {
factory.getReactiveConnection().serverCommands().flushAll().thenMany(
Flux.just("Jet Black Redis", "Darth Redis", "Black Alert Redis")
.map(name -> new Coffee(UUID.randomUUID().toString(), name))
.flatMap(coffee -> coffeeOps.opsForValue().set(coffee.getId(), coffee)))
.thenMany(coffeeOps.keys("*")
.flatMap(coffeeOps.opsForValue()::get))
.subscribe(System.out::println);
}
}
创建一个RestController为我们的应用程序提供外部接口
src/main/java/hello/CoffeeController.java
package hello;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class CoffeeController {
private final ReactiveRedisOperations<String, Coffee> coffeeOps;
CoffeeController(ReactiveRedisOperations<String, Coffee> coffeeOps) {
this.coffeeOps = coffeeOps;
}
@GetMapping("/coffees")
public Flux<Coffee> all() {
return coffeeOps.keys("*")
.flatMap(coffeeOps.opsForValue()::get);
}
}
创建Application
src/main/java/hello/Application.java
package hello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
运行并测试程序
运行Application,然后在浏览器中访问http://localhost:8080/coffees
祝贺你!您刚刚开发了一个Spring应用程序,它使用Spring Data和Redis进行完全响应的、无阻塞的数据库访问!