在响应式编程中,我们经常会遇到需要处理包含多个元素的异步操作。例如,有一个uni>,我们希望对列表中的每个字符串都执行一个耗时的异步任务,并最终收集或处理所有任务的结果。
一个常见的尝试是使用map将List
问题的核心在于,Uni>本身代表的是一个单值流,其值是一个完整的列表。如果想对列表中的每个元素进行异步操作,并将其视为独立的响应式事件,就需要将这个列表“展开”成一个可以逐个处理的流。Mutiny提供了Multi类型来处理零到N个元素的流,这正是解决此类问题的关键。
Mutiny是Quarkus等框架中广泛使用的响应式编程库,它提供了两种核心类型:
要实现对Uni>中每个元素的异步并发处理,我们需要将Uni
>首先转换为一个Multi
在单元测试或需要非阻塞等待所有异步操作完成的场景中,我们可以利用Multi的特性和onTermination().invoke()回调来确保所有任务执行完毕。以下示例结合了Vert.x Unit,它提供了一个Async机制来管理异步测试的生命周期。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ExtendWith(VertxExtension.class)
public class AsyncListProcessingTest {
// 模拟一个异步操作,返回
一个Uni
private Uni processItemAsync(String item, Random random) {
final int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
System.out.println("Starting process for: " + item + ", duration: " + duration + "ms");
return Uni.createFrom().item(item)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Finished process for: " + item));
}
@Test
public void testAsyncProcessingWithVertxUnit(VertxTestContext context) {
Random random = new Random();
// Vert.x Unit的Async对象,用于通知测试框架异步操作何时完成
context.verify(() -> { // 确保在VertxTestContext的上下文中执行
Uni.createFrom()
.item(List.of("a", "b", "c")) // 初始的Uni>
// 1. 将Uni>转换为Multi
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 2. 对Multi中的每个元素应用异步转换,并将结果合并回Multi
.onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
// 3. 订阅Multi流,处理每个完成的元素
.subscribe()
.with(
s -> System.out.println("Printing result: " + s), // 成功处理每个元素
context::failNow, // 任何错误导致流失败
context::completeNow // 流完成,通知VertxTestContext测试结束
);
});
}
}
代码解释:
在某些场景下,例如在命令行工具或需要等待所有异步操作完成后才能继续主程序执行时,我们可以选择阻塞当前线程直到所有结果都被收集。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
public class BlockingAsyncListProcessing {
private static Uni processItemAsync(String item, Random random) {
final int duration = (random.nextInt(5) + 1) * 1000; // 随机延迟1-5秒
return Uni.createFrom().item(item)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Letter: " + item + ", duration in ms: " + duration));
}
public static void main(String[] args) {
Random random = new Random();
System.out.println("Starting blocking asynchronous processing...");
List results = Uni.createFrom()
.item(List.of("a", "b", "c")) // 初始的Uni>
// 1. 将Uni>转换为Multi
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 2. 对Multi中的每个元素应用异步转换,并将结果合并回Multi
.onItem().transformToUniAndMerge(s -> processItemAsync(s, random))
// 3. 可选:处理每个完成的元素
.onItem().invoke(s -> System.out.println("Printing collected item: " + s))
// 4. 将Multi中的所有元素收集到一个列表中
.collect().asList()
// 5. 阻塞当前线程,直到Uni>完成并返回结果
.await().indefinitely();
System.out.println("All items processed. Collected results: " + results);
}
}
代码解释:
通过Mutiny的Multi类型和onItem().transformToUniAndMerge()操作符,我们可以有效地将Uni>中的每个元素转换为独立的异步任务并进行并发处理。根据应用场景的不同,我们可以选择非阻塞的订阅模式(适用于响应式系统和测试)或阻塞式的await()模式(适用于需要同步等待结果的特定场景)。理解并正确运用这些Mutiny操作符是构建高效、健壮的响应式应用程序的关键。