悠悠楠杉
网站页面
在现代分布式系统中,经常需要异步查询外部系统的状态(如订单支付结果、第三方API处理进度)。传统的同步阻塞轮询会消耗大量线程资源,而基于Reactor Mono的响应式方案能显著提升吞吐量。本文将手把手实现一个高可用的异步轮询方案。
假设我们需要轮询一个物流系统的配送状态,该接口返回{"status": "processing"}或{"status": "delivered"}。轮询需满足:
- 每隔2秒请求一次,最多尝试5次
- 遇到网络错误自动重试
- 状态变为delivered时立即终止
使用Mono+Flux组合实现轮询逻辑:
java
public Mono pollDeliveryStatus(String trackingId) {
return Flux.interval(Duration.ofSeconds(2)) // 定时触发器
.take(5) // 最多5次尝试
.flatMap(attempt -> fetchExternalStatus(trackingId)) // 异步请求
.takeUntil(status -> status.equals("delivered")) // 满足条件终止
.last("processing"); // 默认返回最后一次结果
}
private Mono fetchExternalStatus(String trackingId) {
return WebClient.create("https://api.logistics.com")
.get()
.uri("/status/{id}", trackingId)
.retrieve()
.bodyToMono(JsonNode.class)
.map(json -> json.get("status").asText())
.onErrorResume(e -> Mono.empty()); // 错误时忽略本次轮询
} .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
- **超时控制**:整体轮询不超过10秒java
.timeout(Duration.ofSeconds(10))
- 资源释放:通过doFinally通知监控系统
java
pollDeliveryStatus("12345")
.subscribe(
status -> log.info("最终状态: {}", status),
err -> log.error("轮询失败", err),
() -> log.info("轮询结束")
);
Schedulers.boundedElastic()隔离阻塞调用Metrics.recorder()监控轮询耗时通过这种模式,单个服务实例可轻松管理上千个并发轮询任务,相比传统方案资源消耗降低80%以上。