Reactor (Mono & Flux) 速查表
# 创建(Creation)
方法 | 说明 | 示例 |
---|---|---|
Mono.just(T) | 立即创建一个值(声明时执行) | Mono.just("Hi") |
Mono.empty() | 空(0 个元素,直接 complete) | Mono.empty() |
Mono.error(Throwable) | 直接错误信号 | Mono.error(new RuntimeException()) |
Mono.defer(Supplier<Mono<T>>) | 订阅时才生成 Mono | Mono.defer(() -> Mono.just(UUID.randomUUID())) |
Mono.fromCallable(Callable<T>) | 包装可能抛异常的阻塞逻辑 | Mono.fromCallable(() -> Files.readString(path)) |
Flux.just(T...) | 多个已知值 | Flux.just(1,2,3) |
Flux.range(start, count) | 整数序列 | Flux.range(1, 5) |
Flux.interval(Duration) | 定时发射递增 long | Flux.interval(Duration.ofSeconds(1)) |
Flux.generate(...) | 同步推送,一个个生成 | Flux.generate(sink -> sink.next("hi")) |
Flux.create(...) | 异步/回调式推送 | Flux.create(sink -> {sink.next("a");sink.complete();}) |
# 转换
方法 | 说明 | 示例 |
---|---|---|
map(Function) | 一对一映射 | flux.map(x -> x*2) |
flatMap(Function) | 一对多映射并展开(并行,可能乱序) | flux.flatMap(x -> Flux.range(1, x)) |
concatMap(Function) | 一对多映射并展开(顺序保证) | flux.concatMap(x -> Flux.range(1, x)) |
switchMap(Function) | 只保留最新的映射流 | flux.switchMap(x -> Flux.interval(Duration.ofMillis(100))) |
buffer(n) | 收集成批量 | flux.buffer(3) |
window(n) | 切分成 Flux<Flux<T>> | flux.window(3) |
# 过滤/限流
方法 | 说明 | 示例 |
---|---|---|
filter(Predicate) | 过滤不符合条件的 | flux.filter(x -> x % 2 == 0) |
take(n) | 只取前 n 个 | flux.take(3) |
take(Duration) | 取一段时间 | flux.take(Duration.ofSeconds(5)) |
skip(n) | 跳过前 n 个 | flux.skip(2) |
distinct() | 去重 | flux.distinct() |
# 错误处理
方法 | 说明 | 示例 |
---|---|---|
onErrorReturn(value) | 出错时返回一个默认值 | flux.onErrorReturn(-1) |
onErrorResume(err -> Publisher) | 出错时切换到新流 | flux.onErrorResume(e -> Flux.just(100,200)) |
onErrorContinue(...) | 出错时跳过该元素继续 | flux.onErrorContinue((err, obj) -> {}) |
retry(n) | 出错重试 n 次 | flux.retry(3) |
# 回调
方法 | 说明 | 示例 |
---|---|---|
doOnNext(Consumer) | 每个元素时回调 | flux.doOnNext(x -> log.info("Got {}", x)) |
doOnError(Consumer) | 出错时回调 | flux.doOnError(e -> log.error("Error", e)) |
doOnComplete(Runnable) | 完成时回调 | flux.doOnComplete(() -> log.info("Done")) |
doFinally(SignalType -> ) | 流终止时(完成/错误/取消)统一回调 | flux.doFinally(sig -> log.info("Terminated: {}", sig)) |
doFirst(Runnable) | 订阅前执行 | flux.doFirst(() -> log.info("Before subscribe")) |
log() | 打印完整生命周期日志 | flux.log() |
# 组合
方法 | 说明 | 示例 |
---|---|---|
merge(flux1, flux2) | 合并(并行,顺序不保证) | Flux.merge(f1, f2) |
concat(flux1, flux2) | 串行拼接(顺序保证) | Flux.concat(f1, f2) |
zip(flux1, flux2, combiner) | 按索引合并 | Flux.zip(f1, f2, (a,b) -> a+b) |
combineLatest(...) | 任一流有新值就合并最新值 | Flux.combineLatest(f1, f2, (a,b) -> a+b) |
# 订阅
方法 | 说明 | 示例 |
---|---|---|
subscribe(Consumer) | 消费元素 | flux.subscribe(System.out::println) |
subscribe(onNext, onError, onComplete) | 消费元素 + 错误 + 完成 | flux.subscribe(v -> {}, e -> {}, () -> {}) |
block() | 阻塞等待(Mono 专用,调试用) | Mono.just(1).block() |
blockFirst() / blockLast() | 阻塞取 Flux 的第一个/最后一个元素 | flux.blockFirst() |
上次更新: 2025/08/20, 08:44:27