中的元素:从问题到解决方案 ">中的元素:从问题到解决方案 " />
本文深入探讨了如何在响应式编程框架中异步处理Uni在quarkus或类似的响应式编程环境中,uni>表示一个最终会发出一个list
初始尝试可能涉及将Uni>通过map操作转换为List
为了高效且可靠地处理Uni中的每个元素,并确保所有异步操作都能在主程序退出前完成,我们可以将Uni
转换为Multi,从而将列表中的每个元素作为独立的流事件来处理。结合Vert.x Unit等测试框架的异步上下文,可以优雅地管理异步流的生命周期。
x Unit)import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.time.Duration;
import java.util.List;
import java.util.Random;
@RunWith(VertxUnitRunner.class)
public class AsyncListProcessingExample {
@Test
public void testAsyncListProcessing(TestContext context) {
Random random = new Random();
// 用于在流终止时通知测试框架
Async async = context.async();
Uni.createFrom()
.item(List.of("a", "b", "c")) // 原始的Uni>
// 将 Uni> 转换为 Multi,每个字符串作为独立的流事件
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 对 Multi 中的每个字符串元素进行异步处理
.onItem().transformToUniAndMerge(s ->
// 为每个元素创建一个 Uni,模拟异步操作(这里是随机延迟)
Uni.createFrom().item(s)
.onItem().delayIt().by(Duration.ofMillis((random.nextInt(5) + 1) * 1000))
)
// 在流终止时执行回调,无论成功或失败
.onTermination().invoke((throwable, aBoolean) -> {
if (throwable != null) {
context.fail(throwable); // 如果有异常,测试失败
} else {
async.complete(); // 所有异步操作完成,通知测试通过
}
})
// 订阅 Multi,处理每个完成的元素
.subscribe()
.with(s -> System.out.println("Printing: " + s));
}
}
在某些场景下,我们可能需要等待所有异步操作完成后,才继续执行后续的同步代码,或者需要将所有异步处理的结果收集到一个列表中。Mutiny提供了阻塞式等待和收集结果的机制。
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.List;
import java.util.Random;
public class BlockingListProcessingExample {
public static void main(String[] args) {
Random random = new Random();
List results = Uni.createFrom()
.item(List.of("a", "b", "c")) // 原始的Uni>
// 将 Uni> 转换为 Multi
.onItem().transformToMulti(Multi.createFrom()::iterable)
// 对 Multi 中的每个字符串元素进行异步处理
.onItem().transformToUniAndMerge(s -> {
final int duration = (random.nextInt(5) + 1) * 1000;
// 为每个元素创建一个 Uni,模拟异步操作
return Uni.createFrom().item(s)
.onItem().delayIt().by(Duration.ofMillis(duration))
.invoke(() -> System.out.println("Letter: " + s + ", duration in ms: " + duration));
})
// 订阅 Multi,处理每个完成的元素
.onItem().invoke(s -> System.out.println("Printing: " + s))
// 收集所有结果到一个列表中
.collect().asList()
// 阻塞当前线程,直到所有结果都被收集完毕
.await().indefinitely();
System.out.println("All results collected: " + results);
}
}
在Mutiny中异步处理Uni中的元素,关键在于将Uni
有效地转换为Multi,以便对每个元素进行流式处理。
理解这两种模式及其适用场景,能够帮助开发者更灵活、高效地构建基于Mutiny的异步应用。