使用ConcurrentLinkedQueue或BlockingQueue实现线程安全事件队列,高并发场景可选Disruptor;需确保异常隔离与资源清理,防止数据竞争与事件丢失。
在Java中实现线程安全的事件队列,核心在于确保多个线程能够安全地向队列中添加事件,同时另一个或多个线吸收事件并处理,而不会出现数据竞争、丢失或状态不一致的问题。以下是一些实用且经过验证的方法。
ConcurrentLinkedQueue是Java并发包提供的无锁线程安全队列,适合高并发场景下的生产者-消费者模式。
它基于链表结构,支持高效的入队和出队操作,所有操作都是线程安全的,无需额外同步。
示例代码:
private final Queue
生产者调用events.add(event),消费者调用events.poll()获取任务。
如果希望消费者线程在没有事件时自动等待,可以使用BlockingQueue的实现类如LinkedBlockingQueue。
它内部已做好锁控制,put和take操作会自动阻塞,简化了线程协调逻辑。
示例:
private final BlockingQueue
生产者调用queue.offer(event),消费者调用queue.take()。
对于超高吞吐量场景(如金融交易、日志系统),传统队列可能成为瓶颈。Disruptor是一个高性能无锁框架,基于环形缓冲区(Ring Buffer)设计。
它通过预分配内存、消除伪共享、事件发布机制等手段,极大减少GC和锁竞争。
典型用法:定义Event类,创建EventHandler处理逻辑,通过RingBuffer发布事件。
无论使用哪种队列,都要确保单个事件处理失败不会影响整个消费线程。
建议在消费循环中对每个事件的执行进行try-catch包裹,记录错误日志并继续处理后续事件。
示例结构:
while (running) {
try {
Runnable event = queue.take();
if (event != null) {
try { event.run(); }
catch (Exception e) { log.error("处理事件失败", e); }
}
} catch (InterruptedException e) { break; }
}
基本上就这些。选择哪种方式取决于你的性能需求和系统复杂度。普通业务场景用BlockingQueue足够;追求极致性能再考虑Disruptor。关键是要保证事件不丢失、处理可恢复、线程安全由底层保障。不复杂但容易忽略的是异常处理和资源清理。