Java里如何实现线程安全的事件发布与订阅_事件发布订阅线程安全操作说明

使用CopyOnWriteArrayList可实现线程安全的事件发布订阅,适合读多写少场景;通过读写锁+列表快照可提升高并发性能,结合线程池支持异步分发,确保事件处理不阻塞发布线程,根据场景选择方案。

在Java中实现线程安全的事件发布与订阅,关键在于确保多个线程同时注册、注销或触发事件时不会引发并发问题。常见的做法是使用线程安全的数据结构来管理监听器列表,并保证事件发布过程中的读写一致性。

使用CopyOnWriteArrayList管理监听器

最简单且高效的线程安全方式是使用CopyOnWriteArrayList存储订阅者(监听器)。该集合在修改时会复制底层数组,适合读多写少的场景,如事件广播。

示例代码:

import java.util.concurrent.CopyOnWriteArrayList;

public class EventBus {
    private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>();

    public void subscribe(EventListener listener) {
        listeners.add(listener);
    }

    public void unsubscribe(EventListener listener) {
        listeners.remove(listener);
    }

    public void publish(Event event) {
        for (EventListener listener : listeners) {
            listener.onEvent(event);
        }
    }
}

interface EventListener {
    void onEvent(Event event);
}

class Event {
    private final String data;

    public Event(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}

说明:CopyOnWriteArrayList在遍历过程中允许添加/删除操作,不会抛出ConcurrentModificationException,非常适合事件通知这种频繁读取、较少变更的场景。

使用显式同步控制(适用于复杂逻辑)

如果需要更细粒度的控制,比如支持按类型订阅、异步分发等,可以使用synchronizedReentrantReadWriteLock保护监听器集合。

示例:使用读写锁提升性能

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ThreadSafeEventBus {
    private final List listeners = new ArrayList<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    public void subscribe(EventListener listener) {
        lock.writeLock().lock();
        try {
            listeners.add(listener);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void unsubscribe(EventListener listener) {
        lock.writeLock().lock();
        try {
            listeners.remove(listener);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void publish(Event event) {
        lock.readLock().lock();
        try {
            // 创建快照避免持有锁期间调用回调
            List safeListeners = new ArrayList<>(listeners);
            for (EventListener listener : safeListeners) {
                listener.onEvent(event);
            }
        } finally {
            lock.readLock().unlock();
        }
    }
}

优势:读操作不阻塞,适合高频率事件发布;通过拷贝监听器列表,避免在锁内执行可能耗时的回调方法。

异步事件分发与线程池集成

若事件处理较耗时,建议将事件分发交给线程池,避免阻塞发布线程。

改进publish方法:

private final ExecutorService executor = Executors.newFixedThreadPool(4);

public void publish(Event event) {
    lock.readLock().lock();
    List safeListeners;
    try {
        safeListeners = new ArrayList<>(listeners);
    } finally {
        lock.readLock().unlock();
  

} for (EventListener listener : safeListeners) { executor.submit(() -> listener.onEvent(event)); } }

注意:异步模式下事件顺序不保证,需根据业务决定是否使用单线程线程池(如newSingleThreadExecutor)来保序。

基本上就这些。选择哪种方式取决于你的使用场景:轻量级用CopyOnWriteArrayList,功能复杂可配合读写锁和线程池,关键是保证监听器列表的线程安全和事件发布的稳定性。