Spring Boot 中使用并行 REST 调用优化多子请求性能

本文介绍如何在 spring 应用中安全、高效地并行执行多个 rest 子请求,解决因串行调用导致的接口响应延迟问题,并提供线程安全的响应聚合方案。

在构建基于 Spring 的微服务接口时,常遇到“父请求携带多个子请求”的典型场景:例如一个订单查询接口需同时调用库存、物流、用户画像等下游服务。若对 10–50 个子请求逐个 RestTemplate 或 WebClient 同步调用,整体耗时呈线性叠加,极易突破 SLA。此时,并行化是关键优化手段——但直接使用 parallelStream().forEach() 会因 Java 闭包限制(要求局部变量 final 或 effectively final)而无法安全收集结果。

✅ 正确做法:选用线程安全的集合 + 显式标识映射

核心原则是:每个子请求必须具备唯一标识(如 id、sequence 或业务 key),以便后续按序组装或关联响应。推荐两种方案:

方案一:按序号索引 → 使用 synchronizedList(适用于 ID 为连续整数)

// 初始化线程安全的动态数组(预设容量提升性能)
final List childResponses = 
    Collections.synchronizedList(new ArrayList<>(childRequests.size()));
// 预填充 null 占位,避免并发 add 时索引越界
childRequests.forEach(req -> childResponses.add(null));

childRequests.parallelStream()
    .forEach(request -> {
        try {
            ChildResponse response = restTemplate.postForObject(
                "https://api.example.com/child", request, ChildResponse.class);
            // 安全写入指定索引(request.getId() 应为 0-based int)
            childResponses.set(request.getId(), response);
        } catch (Exception e) {
            log.error("Failed to call child API for id: {}", request.getId(), e);
            childResponses.set(request.getId(), 
                Ch

ildResponse.error(request.getId(), e.getMessage())); } });

方案二:按业务键映射 → 使用 ConcurrentHashMap(推荐,更通用灵活)

final Map responseMap = new ConcurrentHashMap<>();
childRequests.parallelStream()
    .forEach(request -> {
        try {
            ChildResponse response = webClient.post()
                .uri("https://api.example.com/child")
                .bodyValue(request)
                .retrieve()
                .bodyToMono(ChildResponse.class)
                .block(); // 注意:此处 block 仅用于示例;生产建议用 Mono.zip + Flux.concatMap
            responseMap.put(request.getBusinessKey(), response);
        } catch (Exception e) {
            responseMap.put(request.getBusinessKey(), 
                ChildResponse.error(request.getBusinessKey(), e.getMessage()));
        }
    });

// 后续按原始 childRequests 顺序组装最终响应(保障语义一致性)
List orderedResponses = childRequests.stream()
    .map(req -> responseMap.getOrDefault(req.getBusinessKey(), 
        ChildResponse.empty(req.getBusinessKey())))
    .collect(Collectors.toList());

⚠️ 关键注意事项

  • 勿用 ArrayList / HashMap 直接共享:非线程安全集合在并行流中会导致 ConcurrentModificationException 或数据丢失;
  • 避免 forEach 返回值收集:parallelStream().forEach() 无返回值,不可用于构建新集合;应改用 collect()(但需自定义 Collector,复杂度高),故推荐上述显式集合方案;
  • 超时与熔断必配:并行调用放大下游故障影响,务必为每个子请求设置独立 connectTimeout / readTimeout,并集成 Resilience4j 或 Sentinel;
  • 优先考虑 WebClient + 响应式编程:parallelStream 是阻塞式并行,受限于线程池;更优雅的方式是使用 Flux.merge(childRequests, concurrency) 配合 WebClient 实现非阻塞并发(资源利用率更高);
  • 监控与追踪:为每个子请求添加 MDC 日志上下文及 OpenTelemetry Span,便于问题定位。

通过合理选择线程安全容器、明确请求标识、并辅以超时和错误处理,即可在保障数据一致性的前提下,将数十个子请求的总耗时从秒级降至单次最长依赖耗时,显著提升父接口吞吐量与用户体验。