在Reactor中,Flux和Mono是响应式流的核心构建块,它们代表了0到N个(Flux)或0到1个(Mono)元素的异步序列。它们本质上是发布者(Publisher),负责发出事件,而不是提供一个直接的“注入”或“发送”方法供外部调用。这意味着,你不能像操作一个队列那样,直接向一个已经存在的Flux实例调用一个类似emit(object)的方法来添加元素。
当你从一个外部库获得一个Flux
FluxaFluxMap = Library.createMappingToMappedType();
这个aFluxMap已经是一个完整的发布者,它有自己的数据源和处理逻辑。你通常可以订阅它来消费其产生的MappedType事件,例如通过aFluxMap.doOnNext(converted -> doJob(converted))。然而,直接向它“发送”你的自定义对象以期望它进行转换并发出,是不符合其设计模式的。
为了能够动态地发出自定义事件,Reactor提供了FluxProcessor和FluxSink。FluxProcessor是一个特殊的类型,它既是Subscriber又是Publisher,允许你向其发送事件(作为Subscriber),并从它接收事件(作为Publisher)。FluxSink则是FluxProcessor的一个接口,提供了next()、error()和complete()等方法,用于精确控制事件的发射。
以下是如何创建一个可控的Flux并向其发射事件的基本示例:
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
// 假设我们有某种RawType和MappedType
class RawType { String data; public RawType(String data) { this.data = data; } @Override public String toString() { return "RawType(" + data + ")"; } }
class MappedType { String mappedData; public MappedType(String mappedData) { this.mappedData = mappedData; } @Override public String toString() { return "MappedType(" + mappedData + ")"; } }
public class CustomFluxEmitter {
public static void main(String[] args) {
// 1. 创建一个UnicastProcessor作为我们自定义事件的源
UnicastProcessor customRawProcessor = UnicastProcessor.create();
// 2. 获取FluxSink,用于向customRawProcessor发射事件
FluxSink rawSink = customRawProcessor.sink();
// 3. 将自定义的RawType流转换为MappedType流
// 这里假设我们有一个转换函数,或者MappedType可以直接从RawType构建
Flux yourCustomMappedFlux = customRawProcessor
.map(raw -> new MappedType("Mapped(" + raw.data + ")"));
// 此时 yourCustomMappedFlux 是一个可以由 rawSink 控制的 MappedType 流
yourCustomMappedFlux.subscribe(
mapped -> System.out.println("Custom Mapped Type: " + mapped),
error -> System.err.println("Error in custom flux: " + error),
() -> System.out.println("Custom flux completed")
);
// 4. 模拟发射自定义事件
rawSink.next(new RawType("Input A"));
rawSink.next(new RawType("Input B"));
// rawSink.complete(); // 可以在适当时候完成流
}
} 这段代码展示了如何创建一个由你控制的Flux (yourCustomMappedFlux),并通过rawSink向其发射RawType事件,这些事件随后被转换为MappedType。
既然不能直接向外部库的Flux注入事件,那么最常见的解决方案是创建一个你自己的可控Flux,然后使用Reactor的组合操作符(如merge、concat、zip)将其与外部库的Flux合并。这样,你就拥有了一个包含两部分事件的统一流:一部分来自外部库,另一部分来自你的自定义发射器。
考虑到你的目标是“发射一些对象到aFluxMap以获取MappedType”,并且aFluxMap本身已经是Flux
以下是使用Flux.merge操作符的示例:
import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.UnicastProcessor; import java.time.Duration; // 假设 MappedType 已经定义,并且 Library 提供了 createMappingToMappedType 方法 // 模拟外部库的Flux class Library { public static Flux
createMappingToMappedType() { // 模拟一个每秒发出一个 MappedType 的外部 Flux return Flux.interval(Duration.ofSeconds(1)) .take(3) // 只发出3个元素 .map(i -> new MappedType("Library Mapped " + i)); } } public class MergeFluxExample { public static void main(String[] args) throws InterruptedException { // 1. 获取外部库的 Flux Flux aFluxMap = Library.createMappingToMappedType(); // 2. 创建一个可控的 Flux,用于发射你的自定义 MappedType 事件 UnicastProcessor customProcessor = UnicastProcessor.create(); FluxSink customSink = customProcessor.sink(); Flux yourCustomFlux = customProcessor; // 3. 使用 Flux.merge 合并两个 Flux // merge操作符会将两个或更多Publisher的元素交错合并到一个新的Flux中 Flux combinedFlux = Flux.merge(aFluxMap, yourCustomFlux); // 4. 订阅合并后的 Flux 并处理事件 combinedFlux.doOnNext(mapped -> System.out.println("Received: " + mapped)) .doOnComplete(() -> System.out.println("Combined Flux Completed")) .subscribe(); // 5. 模拟在运行时发射自定义 MappedType 事件 System.out.println("Emitting custom events..."); Thread.sleep(500); // 等待一下,让library的flux先开始 customSink.next(new MappedType("Custom A")); Thread.sleep(1200); customSink.next(new MappedType("Custom B")); Thread.sleep(1200); customSink.next(new MappedType("Custom C")); customSink.complete(); // 完成自定义流 // 等待一段时间观察输出 Thread.sleep(5000); } }
在这个例子中,Flux.merge(aFluxMap, yourCustomFlux)创建了一个新的Flux,它会同时监听aFluxMap和yourCustomFlux,并将它们发出的MappedType事件交错地