Reactive Stream 中如何正确合并多个 Flux 数据流

在 spring webflux 或 project reactor 中,使用 `mergewith` 时需注意其不可变性——它不会原地修改流,而是返回新流;错误地忽略返回值会导致数据丢失,正确做法是用 `flatmap` 或链式 `fold` 累积合并。

在响应式编程中,常见的误区之一是将命令式思维(如 for 循环 + 累加变量)直接套用于 Reactor 的 Flux 操作。你提供的代码:

val ids = repository.findIds().map { it.ekycId }
val allEventFlux = Flux.empty()
for (id in ids) {
    val events: Flux = eventStore.readEvents(id)
    allEventFlux.mergeWith(events) // ❌ 错误:返回新 Flux,但未赋值!
}

问题核心在于:mergeWith 是纯函数式操作,返回一个全新的 Flux,而非修改原流。因此 allEventFlux.mergeWith(events) 执行后,结果被丢弃,allEventFlux 始终保持为初始的空流 Flux.empty()。

✅ 正确方案一:推荐使用 flatMap(语义清晰、性能友好)

val allEvents: Flux = Flux.fromIterable(repository.findId

s()) .map { it.ekycId } .flatMap { id -> eventStore.readEvents(id) }
  • flatMap 将每个 ID 映射为一个 Flux,并并发(默认 prefetch=32)扁平化合并所有事件流;
  • 自动处理背压,适合 I/O 密集型场景(如多次数据库/事件存储查询);
  • 代码简洁、可读性强,是 Reactor 中“一对多异步流聚合”的标准范式。

✅ 正确方案二:若需严格顺序合并(如 mergeWith 语义),用 fold

val ids = repository.findIds().map { it.ekycId }
val allEvents: Flux = ids.fold(Flux.empty()) { acc, id ->
    acc.mergeWith(eventStore.readEvents(id))
}
  • fold 从空流开始,逐个累积调用 mergeWith,生成最终合并流;
  • 注意:mergeWith 本身是惰性组合,不触发执行,仅构建流拓扑;
  • 该方式按 ids 顺序依次合并,但不保证各 readEvents(id) 内部事件的全局顺序(因 mergeWith 是并发合并);如需完全保序(即先 ID1 全部事件,再 ID2 全部事件),应改用 concatWith:
val allEventsInOrder: Flux = ids.fold(Flux.empty()) { acc, id ->
    acc.concatWith(eventStore.readEvents(id)) // ✅ 严格串行:ID1 → ID2 → ...
}

⚠️ 注意事项

  • 避免在循环中重复声明/忽略返回值:Reactor 的所有操作符(map, filter, mergeWith, concatWith 等)均返回新实例,无副作用;
  • Flux.empty() 是冷流:它不触发任何计算,仅作为初始占位符;
  • 背压与资源管理:flatMap 默认并发数为 256(Reactor 3.5+),可通过 .flatMap(..., concurrency) 调整;concatWith 则天然满足背压传递,但吞吐量较低;
  • 调试技巧:可在关键节点添加 .doOnNext { println("Event: $it") } 或 .log() 辅助验证流是否被正确构建与订阅。

总结

场景 推荐操作符 特点
高吞吐、事件无需严格 ID 顺序 flatMap 并发执行,自动背压,最常用
各 ID 事件需严格串行输出 concatWith(配合 fold) 顺序执行,延迟高,适合强序要求
多流静态合并(已知固定数量) Flux.merge(flux1, flux2, flux3) 更直观,适用于编译期确定流数

始终牢记:Reactor 是声明式、不可变的响应式流模型——每一次操作都在定义“未来如何处理数据”,而非立即执行。 正确理解这一范式,是写出健壮响应式代码的前提。