如何在 Java Project Reactor 中高效记录响应式数据流日志?

本文详解 project reactor 中 `log

()`、`doonerror`、`doonfinally` 等日志操作符的适用场景与最佳实践,帮助你在 spring webflux 项目中实现可调试、低侵入、生产友好的响应式链路追踪。

在基于 Project Reactor 的响应式应用(如 Spring WebFlux)中,缺乏清晰的日志输出常导致生产环境问题排查困难——因为默认情况下,Flux/Mono 不会自动打印中间状态或错误上下文。幸运的是,Reactor 提供了多套语义明确、副作用可控的日志机制,合理选用可显著提升可观测性。

✅ 推荐首选:.log() —— 零配置、全生命周期洞察

log() 是最轻量且信息最丰富的日志工具。它无需手动编写日志语句,即可自动记录订阅、请求、onNext/onError/onComplete 事件、取消信号及上下游上下文(包括线程名、操作符栈),非常适合开发与测试阶段快速定位行为异常:

return repositoryName.findById(event.eventId())
    .filter(event -> event.completedDate() == null)
    .filterWhen(event -> externalService.getEventSummary(event.getUser().userId()))
    .doOnNext(e -> log.info("Event found: {}", e.id())) // 业务语义日志
    .log("EventProcessingPipeline") // 自动记录完整生命周期(含线程、事件类型、耗时)
    .onErrorResume(e -> {
        log.error("Failed to process event {}", event.eventId(), e);
        return Mono.empty();
    });
⚠️ 注意:log() 默认使用 INFO 级别,生产环境慎用全局 log()(尤其高并发场景),建议通过 log(String category, LogLevel level) 控制粒度,或仅对关键链路启用。

✅ 精准控制:doOnNext / doOnError / doOnFinally —— 有状态、可定制

当需结合业务逻辑记录结构化日志(如埋点、指标更新、审计日志)时,推荐使用带副作用的“doOn”系列操作符。它们不改变数据流本身,仅在指定时机执行副作用代码,并严格保持原始元素传递:

  • doOnNext(T t):在每个成功元素到达时执行(对应 onNext)
  • doOnError(Throwable t):在流终止于错误时执行(注意:不拦截异常,仅监听)
  • doOnFinally(SignalType signal):无论成功/失败/取消,均执行(类似 finally,适合资源清理或终态统计)
return repositoryName.findById(event.eventId())
    .doOnNext(found -> log.debug("✅ Event {} loaded from DB", found.id()))
    .filter(event -> event.completedDate() == null)
    .doOnNext(incomplete -> log.trace("⏳ Event {} is incomplete, proceeding...", incomplete.id()))
    .filterWhen(event -> externalService.getEventSummary(event.getUser().userId()))
    .doOnNext(summary -> log.info("? Summary fetched for event {}: {}", event.eventId(), summary))
    .doOnError(e -> log.warn("⚠️  External service failed for event {}", event.eventId(), e))
    .doOnFinally(signal -> {
        if (signal == SignalType.ON_COMPLETE) {
            metrics.eventProcessedCounter().increment();
        } else if (signal == SignalType.ON_ERROR) {
            metrics.eventProcessingErrorCounter().increment();
        }
    });

✅ 关键原则:doOn* 系列绝不应抛出异常(否则将中断流),且避免阻塞 I/O(如同步数据库写入)。若需异步持久化日志,请使用 publishOn(Schedulers.boundedElastic()) 切换线程。

❌ 避免滥用:onErrorResume 不是日志工具

onErrorResume 的核心职责是错误恢复(返回替代 Mono/Flux),而非日志记录。将其用于日志会导致逻辑混淆,且掩盖真实错误路径:

// ❌ 反模式:用 onErrorResume 做日志(破坏错误传播语义)
.onErrorResume(e -> {
    log.error("Unexpected error", e); // 日志虽执行,但错误被“吞掉”
    return Mono.empty(); // 流静默终止,上游无法感知
});

// ✅ 正确做法:先 doOnError 记录,再 onErrorResume 恢复
.doOnError(e -> log.error("External call failed for event {}", event.eventId(), e))
.onErrorResume(e -> handleFallbackLogic());

? 总结:按场景选择日志策略

场景 推荐操作符 说明
快速诊断、开发调试 .log("category") 全事件、线程、耗时自动记录,开箱即用
业务关键节点标记 .doOnNext() / .doOnError() 结构化日志 + 指标更新,强语义、易维护
终态统计与清理 .doOnFinally() 覆盖 success/error/cancel 三种终态
错误恢复(非日志) .onErrorResume() 仅用于提供 fallback 数据,勿替代日志

最后提醒:所有日志操作符均应在必要处添加,避免在高频内循环或海量数据流中无差别打点;生产环境建议结合 MDC(如 Mono.subscriberContext() 注入 traceId)实现链路追踪,并通过日志级别(如 DEBUG/TRACE)动态开关详细日志。