如何在 Mutiny 中顺序执行多个 Uni 操作并容错续行

本文介绍如何使用 mutiny 的 `multi` 将列表元素逐个、顺序地传递给返回 `uni` 的异步方法,并在某个操作失败时自动降级并继续后续处理,而非中断整个流

程。

在响应式编程中,当需要严格按序执行多个异步操作(即前一个完成后再触发下一个),且要求单个失败不阻断整体流程时,直接使用 Uni.combine().all() 或并行 Multi.concatMap() 都无法满足需求。Mutiny 提供了优雅的解决方案:利用 Multi.createFrom().iterable() 构建数据流,再结合 transformToUniAndConcatenate 实现串行化 + 容错。

transformToUniAndConcatenate 是关键——它确保每个 Uni 按输入项顺序依次订阅(concatenation 语义),且上游 Multi 会等待当前 Uni 终止(无论成功或失败)后才发射下一项,天然支持“一个接一个”的执行模型。

以下是完整实现示例:

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import java.util.List;

public class SequentialUniProcessor {

    private Uni asyncMethod(Element element) {
        // 模拟异步调用,例如 HTTP 请求或 DB 查询
        return Uni.createFrom().item("result-for-" + element.getId())
                .onItem().delayIt().by(Duration.ofMillis(100));
    }

    public Multi processElementsSequentially(List elements) {
        return Multi.createFrom()
                .iterable(elements)
                .onItem()
                .transformToUniAndConcatenate(element -> 
                    asyncMethod(element)
                        .onFailure() // 捕获任意异常(包括 RuntimeException)
                        .recoverWithItem("fallback-for-" + element.getId()) // 降级值,类型需匹配 String
                );
    }
}

关键要点说明:

  • ✅ transformToUniAndConcatenate:强制串行执行,避免竞态与乱序;
  • ✅ .onFailure().recoverWithItem(...):推荐用于已知错误场景,返回默认值;若需更复杂逻辑(如日志+重试+兜底),可改用 .onFailure().recoverWithUni(failure -> {...});
  • ❌ 避免使用 transformToUniAndFlatMap —— 它会并发执行,失去顺序性;
  • ⚠️ 若 elements 为空,Multi 将直接完成(不发射任何项),符合预期;
  • ? 最终返回的是 Multi,可进一步 .collect().asList() 聚合结果,或 .subscribe().with(System.out::println) 流式消费。

总结:通过 Multi.iterable(...) → transformToUniAndConcatenate → onFailure.recover... 这一链式组合,你既能保证严格的执行顺序,又能实现弹性容错,是 Mutiny 中处理“串行异步任务队列”的标准范式。