如何在 Quarkus 或 Mutiny 中实现异步操作的顺序执行与容错处理

本文介绍如何使用 mutiny 的 `multi` 和 `uni` 实现对列表元素的逐个异步调用,确保严格串行执行、失败不中断,并优雅降级。

在响应式编程中,「顺序执行多个异步任务」是一个常见但易被误解的需求。不同于并行(transformToUniAndMe

rge)或无序(transformToUniAndConcatenate 的非严格串行变体),真正的逐项等待前一项完成后再发起下一项,需依赖 Mutiny 的 transformToUniAndConcatenate —— 它会将每个 Uni 串联(concatenate)执行,即前一个 Uni 终止(无论成功或失败)后,才订阅下一个。

以下为完整实现方案:

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

// 假设已有:
// List elements;
// private Uni asyncMethod(Element element) { ... }

Multi sequentialResults = Multi.createFrom()
    .iterable(elements)
    .onItem()
    .transformToUniAndConcatenate(element -> 
        asyncMethod(element)
            .onFailure() // 捕获任意异常(包括 checked exception 包装后的 RuntimeException)
            .recoverWithItem("fallback_for_" + element.getId()) // 自定义降级值(类型须匹配 String)
            // 或使用 continueWith(() -> Uni.createFrom().item(...)) 实现更复杂逻辑
    );

关键点说明:

  • transformToUniAndConcatenate 是唯一保证严格串行的算子:它不会并发触发下游 Uni,也不会跳过失败项;
  • onFailure().recoverWithItem(...) 确保单个元素失败时返回默认值,流程继续(而非终止 Multi 流);
  • 若需记录错误日志或执行副作用(如告警),可用 onFailure().invoke(throwable -> log.warn(...)) 配合 recoverWithItem;
  • 最终得到的是 Multi,可进一步 .collect().asList() 聚合成 Uni>,或 .subscribe().with(System.out::println) 消费结果。

⚠️ 注意事项:

  • 切勿误用 transformToUniAndMerge(并行)或 transformToUniAndConcatenate 的误配 onItem().transformToUni(...)(无串联语义);
  • 若 asyncMethod 可能返回 null,需在 recoverWithItem 前加 .onItem().ifNull().continueWith(...) 防空;
  • 严格串行意味着总耗时 ≈ 各次调用耗时之和,若性能敏感且业务允许部分并行,应重新评估需求。

通过上述方式,你既能保持响应式链的声明式表达力,又精准满足「顺序、容错、持续」三大约束。