闲碎记事本 闲碎记事本
首页
  • JAVA
  • Cloudflare
  • 学完再改一遍UI
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

YAN

我要偷偷记录...
首页
  • JAVA
  • Cloudflare
  • 学完再改一遍UI
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • java

    • SpringBoot

    • SpringSecurity

    • MybatisPlus

    • Netty

    • sip

    • 其他

      • MDC 使用
      • 位运算
      • RedisMQ实现
      • 自定义枚举序列化
      • Mybatis使用自定义枚举
      • Jackson反序列化泛型注意点
      • 敏感词过滤算法
      • 线程
      • 并发学习
      • jni使用
      • 关于注释
      • 为什么一个Byte用两个16进制表示
      • JAVA获取系统信息
      • 对extends和super的理解
      • JAVA系统API
      • java探针初探
      • JAVA获取USB信息
      • HashMap初探
      • JAVA远程调试
      • 初探webflux
      • SSE示例
      • Reactor (Mono & Flux) 速查表
    • linux

    • docker

    • redis

    • nginx

    • mysql

    • 其他

    • 环境搭建

    • 知识库
    • java
    • 其他
    YAN
    2025-08-20
    目录

    Reactor (Mono & Flux) 速查表

    # 创建(Creation)

    方法 说明 示例
    Mono.just(T) 立即创建一个值(声明时执行) Mono.just("Hi")
    Mono.empty() 空(0 个元素,直接 complete) Mono.empty()
    Mono.error(Throwable) 直接错误信号 Mono.error(new RuntimeException())
    Mono.defer(Supplier<Mono<T>>) 订阅时才生成 Mono Mono.defer(() -> Mono.just(UUID.randomUUID()))
    Mono.fromCallable(Callable<T>) 包装可能抛异常的阻塞逻辑 Mono.fromCallable(() -> Files.readString(path))
    Flux.just(T...) 多个已知值 Flux.just(1,2,3)
    Flux.range(start, count) 整数序列 Flux.range(1, 5)
    Flux.interval(Duration) 定时发射递增 long Flux.interval(Duration.ofSeconds(1))
    Flux.generate(...) 同步推送,一个个生成 Flux.generate(sink -> sink.next("hi"))
    Flux.create(...) 异步/回调式推送 Flux.create(sink -> {sink.next("a");sink.complete();})

    # 转换

    方法 说明 示例
    map(Function) 一对一映射 flux.map(x -> x*2)
    flatMap(Function) 一对多映射并展开(并行,可能乱序) flux.flatMap(x -> Flux.range(1, x))
    concatMap(Function) 一对多映射并展开(顺序保证) flux.concatMap(x -> Flux.range(1, x))
    switchMap(Function) 只保留最新的映射流 flux.switchMap(x -> Flux.interval(Duration.ofMillis(100)))
    buffer(n) 收集成批量 flux.buffer(3)
    window(n) 切分成 Flux<Flux<T>> flux.window(3)

    # 过滤/限流

    方法 说明 示例
    filter(Predicate) 过滤不符合条件的 flux.filter(x -> x % 2 == 0)
    take(n) 只取前 n 个 flux.take(3)
    take(Duration) 取一段时间 flux.take(Duration.ofSeconds(5))
    skip(n) 跳过前 n 个 flux.skip(2)
    distinct() 去重 flux.distinct()

    # 错误处理

    方法 说明 示例
    onErrorReturn(value) 出错时返回一个默认值 flux.onErrorReturn(-1)
    onErrorResume(err -> Publisher) 出错时切换到新流 flux.onErrorResume(e -> Flux.just(100,200))
    onErrorContinue(...) 出错时跳过该元素继续 flux.onErrorContinue((err, obj) -> {})
    retry(n) 出错重试 n 次 flux.retry(3)

    # 回调

    方法 说明 示例
    doOnNext(Consumer) 每个元素时回调 flux.doOnNext(x -> log.info("Got {}", x))
    doOnError(Consumer) 出错时回调 flux.doOnError(e -> log.error("Error", e))
    doOnComplete(Runnable) 完成时回调 flux.doOnComplete(() -> log.info("Done"))
    doFinally(SignalType -> ) 流终止时(完成/错误/取消)统一回调 flux.doFinally(sig -> log.info("Terminated: {}", sig))
    doFirst(Runnable) 订阅前执行 flux.doFirst(() -> log.info("Before subscribe"))
    log() 打印完整生命周期日志 flux.log()

    # 组合

    方法 说明 示例
    merge(flux1, flux2) 合并(并行,顺序不保证) Flux.merge(f1, f2)
    concat(flux1, flux2) 串行拼接(顺序保证) Flux.concat(f1, f2)
    zip(flux1, flux2, combiner) 按索引合并 Flux.zip(f1, f2, (a,b) -> a+b)
    combineLatest(...) 任一流有新值就合并最新值 Flux.combineLatest(f1, f2, (a,b) -> a+b)

    # 订阅

    方法 说明 示例
    subscribe(Consumer) 消费元素 flux.subscribe(System.out::println)
    subscribe(onNext, onError, onComplete) 消费元素 + 错误 + 完成 flux.subscribe(v -> {}, e -> {}, () -> {})
    block() 阻塞等待(Mono 专用,调试用) Mono.just(1).block()
    blockFirst() / blockLast() 阻塞取 Flux 的第一个/最后一个元素 flux.blockFirst()
    上次更新: 2025/08/20, 08:44:27
    SSE示例
    Shell基础

    ← SSE示例 Shell基础→

    最近更新
    01
    Caddy操作指南
    04-25
    02
    虚拟机磁盘扩展
    04-22
    03
    Swap空间
    04-22
    更多文章>
    Theme by Vdoing | Copyright © 2022-2025 YAN | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式