如何在 Mono/Flux 中正确调用 WebClient 方法

在响应式编程中,不应在 `map()` 内部直接调用 `webclient` 并手动 `subscribe()`;而应使用 `flatmap()` 等异步转换操作符来组合 `mono`/`flux`,以保持响应式链的完整性与非阻塞性。

map() 是一个同步、纯函数式操作符,其设计初衷是将一个值(如 T)转换为另一个值(如 R),不引入任何异步副作用。一旦你在 map() 中执行 WebClient.post(...).subscribe(),就破坏了响应式流的核心契约:你不仅丢弃了返回的 Mono 或 Mono,还触发了“火并忘记”(fire-and-forget)式订阅——这会导致错误无法传播、重试逻辑失效、上下文(如 Context 或 TraceId)丢失,且彻底脱离了 Reactor 的背压与生命周期管理。

正确的做法是使用 flatMap():它接受一个 T → Publisher 的函数,天然适配异步 I/O 操作(如 HTTP 调用),并自动将内部 Publisher 扁平化接入主流。所有错误处理、重试、转换都可声明式地串联在链中,无需手动订阅。

以下是重构后的关键代码片段(已移除 subscribe(),改用 flat

Map,并优化错误处理与重试逻辑):

return mailTemplateMappingRepository
    .findById(request.getTemplateKey())
    .switchIfEmpty(Mono.error(new MailTemplateNotSupportedException(
        "The template with key " + request.getTemplateKey() + " is not supported!!!")))
    .flatMap(t -> {
        log.info("sendEmailWithRetry: request {}", request);
        log.info("sendEmailWithRetry: templateMappings {}", t);

        // 同步逻辑仍可保留在 flatMap 内(如 token 刷新判断)
        if (!businessUnitAuthTokens.containsKey(t.getExactTargetBusinessUnit())) {
            updateBusinessUnitToken(t);
        }

        String token = "Bearer " + businessUnitAuthTokens.get(t.getExactTargetBusinessUnit());
        String uri = exactTargetMessageDefinitionSendsUrl.replace("{key}", t.getExactTargetKey());
        Map mailTriggerPayload = generateMailTriggerPayload(request);

        // 定义重试策略(注意:retryWhen 作用于下游 Publisher)
        RetryBackoffSpec is401RetrySpec = Retry.backoff(1, Duration.ofSeconds(2))
            .filter(throwable -> throwable instanceof Unauthorized)
            .doBeforeRetry(retrySignal -> {
                log.warn("UNAUTHORIZED detected; refreshing token for BU: {}", t.getExactTargetBusinessUnit());
                updateBusinessUnitToken(t);
            })
            .onRetryExhaustedThrow((spec, signal) ->
                new ExactTargetException(
                    HttpStatus.UNAUTHORIZED.value(),
                    signal.failure().getMessage(),
                    "Authorization failed after retries for business unit: " + t.getExactTargetBusinessUnit()
                )
            );

        // ✅ 正确:返回 Mono(由 WebClient 返回),由 flatMap 自动订阅并融合
        return restClientService.post(uri, mailTriggerPayload, token, String.class)
            .onErrorResume(error -> {
                if (error instanceof ExactTargetException) {
                    return Mono.error(new ExactTargetException(
                        ((ExactTargetException) error).getStatus(),
                        ((ExactTargetException) error).getBody(),
                        "Exact Target error: status=" + ((ExactTargetException) error).getStatus()
                            + ", body=" + ((ExactTargetException) error).getBody()
                    ));
                }
                return Mono.error(error);
            })
            .retryWhen(is401RetrySpec);
    });

关键注意事项:

  • 永远不要在 map() / filter() / doOnNext() 等同步操作符中调用 subscribe() —— 这是响应式编程中最常见的反模式。
  • ✅ flatMap() 是处理“一个值 → 一个异步任务”的标准方式;若需并发多个请求,可考虑 flatMapSequential() 或 concatMap() 控制顺序。
  • ⚠️ updateBusinessUnitToken(t) 若本身是阻塞或非响应式操作,应封装为 Mono.fromCallable(...) 并用 flatMap 组合,避免污染响应式线程(如 block())。
  • ? 错误类型需精确匹配(如 Unauthorized 是否为自定义异常?确保 filter() 中能正确识别);建议统一使用 Spring 的 WebClientResponseException 子类便于标准化处理。
  • ? 日志中避免打印敏感信息(如完整 token、payload),生产环境应脱敏。

遵循此模式,你的链将真正具备响应式特性:可组合、可背压、可追踪、可监控,并与 Spring WebFlux 生态无缝集成。