在响应式编程中,我们经常会遇到这样的场景:一个异步操作返回一个mono
例如,我们有一个获取订单的Mono
public class Order {
private UUID id;
private String name;
private UUID truckId; // 我们需要提取的字段
// 构造函数、Getter/Setter略
public UUID getTruckId() {
return truckId;
}
}
public class Truck {
private UUID id;
private String model;
// 构造函数、Getter/Setter略
}
// 假设的服务接口
interface OrderService {
Mono getById(UUID id);
}
interface VehicleService {
Mono getByTruckId(UUID truckId);
} 我们的目标是:
flatMap是Reactor中一个非常重要的操作符,它允许我们将一个发出T的Mono(即Mono
如果我们的最终目标仅仅是获取Mono
import reactor.core.publisher.Mono;
import java.util.UUID;
public class ReactiveDataExtraction {
private OrderService orderService; // 假设已注入
private VehicleService vehicleService; // 假设已注入
// 模拟服务方法
private Mono getById(UUID id) {
// 实际应用中会调用orderService.getById(id)
return Mono.just(new Order(id, "Test Order", UUID.randomUUID()));
}
private Mono getByTruckId(UUID truckId) {
// 实际应用中会调用vehicleService.getByTruckId(truckId)
return Mono.just(new Truck(truckId, "Volvo FH"));
}
public Mono getTruckFromOrder(UUID orderId) {
Mono orderMono = getById(orderId);
// 使用flatMap从Mono中提取truckId并调用getByTruckId
Mono truckMono = orderMono.flatMap(order -> getByTruckId(order.getTruckId()));
return truckMono;
}
public static void main(String[] args) {
ReactiveDataExtraction example = new ReactiveDataExtraction();
UUID testOrderId = UUID.randomUUID();
example.getTruckFromOrder(testOrderId)
.subscribe(
truck -> System.out.println("成功获取到卡车信息: " + truck.getModel()),
error -> System.err.println("获取卡车信息失败: " + error.getMessage())
);
// 为了演示非阻塞,通常需要等待异步操作完成
try {
Thread.sleep(1000); // 实际应用中不会这样阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} 解释:orderMono.flatMap(order -> getByTruckId(order.getTruckId()))这行代码的含义是:
有时,我们不仅需要链式调用得到的结果(如Truck),还需要原始的数据(如Order)来构建一个更复杂的聚合对象。在这种情况下,我们可以结合使用flatMap和Mono.zip。
假设我们希望将Order和它对应的Truck组合成一个新的Result对象。
首先,定义一个聚合结果类:
public class Result {
private Order order;
private Truck truck;
public Result(Order order, Truck truck) {
this.order = order;
this.truck = truck;
}
// Getter/Setter略
@Override
public String toString() {
return "Result{" +
"orderId=" + (order != null ? order.getId() : "null") +
", orderName='" + (order != null ? order.getName() : "null") + '\'' +
", truckId=" + (truck != null ? truck.getId() : "null") +
", truckModel='" + (truck != null ? truck.getModel() : "null") + '\'' +
'}';
}
}然后,实现聚合逻辑:
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2; // 用于Mono.zip的默认输出
import java.util.UUID;
public class ReactiveDataAggregation {
private OrderService orderService; // 假设已注入
private VehicleService vehicleService; // 假设已注入
// 模拟服务方法
private Mono getById(UUID id) {
return Mono.just(new Order(id, "Test Order " + id.toString().substring(0,4), UUID.randomUUID()));
}
private Mono getByTruckId(UUID truckId) {
return Mono.just(new Truck(truckId, "Model-" + truckId.toString().substring(0,4)));
}
public Mono getOrderAndTruck(UUID orderId) {
Mono orderMono = getById(orderId);
// 1. 从orderMono派生出truckMono
// 注意:这里truckMono的生成依赖于orderMono,它们是串行的
Mono truckMono = orderMono.flatMap(order -> getByTruckId(order.getTruckId()));
// 2. 使用Mono.zip组合原始的orderMono和派生出的truckMono
// Mono.zip会等待两个Mono都发出值后,将它们组合成一个Tuple2
// 这里需要注意,如果orderMono和truckMono是独立的,zip会并行处理。
// 但由于truckMono的创建依赖于orderMono,这里的zip实际上会在orderMono发出值后,
// 再等待truckMono发出值。
Mono> zippedMono = Mono.zip(orderMono, truckMono);
// 3. 使用flatMap将Tuple2映射为自定义的Result对象
Mono resultMono = zippedMono.flat
Map(tuple ->
Mono.just(new Result(tuple.getT1(), tuple.getT2()))
);
return resultMono;
}
public static void main(String[] args) {
ReactiveDataAggregation example = new ReactiveDataAggregation();
UUID testOrderId = UUID.randomUUID();
example.getOrderAndTruck(testOrderId)
.subscribe(
result -> System.out.println("成功聚合订单和卡车信息: " + result),
error -> System.err.println("聚合信息失败: " + error.getMessage())
);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} 解释:
通过flatMap和Mono.zip等核心操作符,Reactor提供了一种强大而优雅的方式来处理异步数据流中的依赖关系和数据聚合。从Mono中非阻塞地提取内部字段并进行链式服务调用,是构建高性能、高响应性应用程序的关键技术。掌握这些模式,将使你能够更好地利用响应式编程的优势,构建出健壮且可扩展的系统。