Project Reactor 订阅机制、错误处理与背压最佳实践

subscribe()是Flux/Mono生命周期起点,触发onSubscribe/onNext/onComplete或onError;无订阅则无执行,操作符仅组装链路;冷序列用cache()/share()复用,热序列需显式管理生命周期。

订阅机制:理解 Flux/Mono 的生命周期起点

Reacto r 中的 subscribe() 不是简单触发数据流,而是启动整个响应式链的生命周期。调用它时,会依次触发 onSubscribe()(传递 Subscription)、onNext()onComplete()onError()。关键点在于:没有 subscribe(),就没有实际执行 —— 所有操作符(如 mapfilter)只是组装链路,不会触发任何计算或 I/O。

常见误区是多次调用 subscribe() 导致重复执行(比如 HTTP 请求被发多次)。解决方式包括:

  • 对冷序列(Cold Publisher,如 Mono.fromCallable()Flux.range())使用 cache()share() 实现多订阅复用
  • 对热序列(Hot Publisher,如 Flux.create() 配合 publish().refCount())显式管理生命周期
  • 避免在业务逻辑中隐式重复订阅 —— 推荐将数据流封装为方法返回值,由最终调用方统一订阅

错误处理:分层拦截与语义化恢复

Reacto r 提倡“错误即数据”,不鼓励 try-catch 包裹流。正确做法是在链路上用声明式操作符提前捕获并转化异常:

  • onErrorResume():替换异常为替代数据(如 fallback 值),适合非关键路径降级
  • onErrorReturn():直接返回固定值,适用于兜底场景(如查不到用户时返回默认头像)
  • onErrorMap():将原始异常映射为更明确的业务异常(如把 HttpClientException 转成 UserNotFoundException
  • retry()retryWhen():控制重试逻辑;注意避免无条件重试导致雪崩,建议配合指数退避(Retry.backoff(3, Duration.ofSeconds(1))

全局错误兜底可用 doOnError() 记录日志,但不要在这里做恢复 —— 恢复逻辑应放在更上游、语义更清晰的位置。

背压:面向下游消费能力的设计约束

背压不是 Reactor 特有机制,而是 Reactive Streams 规范的核心 —— 它要求下游能主动告知上游“我还能处理多少”。Flux 默认支持背压(Mono 忽略,因其最多只发 1 个元素),但必须正确使用才能生效:

  • 避免在链中混入不支持背压的操作:如 block()toStream()、或未指定策略的 publishOn()(它默认使用 Queues.SMALL_BUFFER_SIZE,可能丢弃请求)
  • 对高吞吐场景,显式配置缓冲区大小:publishOn(scheduler, bufferSize) 或使用 limitRate(n) 控制每批请求数
  • 数据库/HTTP 客户端需选用支持背压的驱动(如 R2DBC、WebClient),否则上游控速无效
  • 自定义 Flux.create() 时,务必调用 subscription.request(n) 响应下游需求,不可自行发数据

组合实践:一个典型 Web API 场景

以 Spring WebFlux 中查询用户为例:

  • WebClient.get().retrieve().bodyToMono(User.class) 发起请求(天然支持背压)
  • timeout(Duration.ofSeconds(3)) 防止无限等待
  • 败时用 onErrorMap(WebClientResponseException::getStatus, this::mapToBusinessException) 统一异常语义
  • 降级走缓存:onErrorResume(e -> cacheService.findUser(id).onErrorComplete())
  • 最终订阅由框架完成(Controller 返回 Mono),不手动调用 subscribe()

这种写法让错误可预测、资源可控、扩展清晰,也便于后续接入熔断(如 Resilience4j)或指标观测。