1、SpringWebflux 介绍

在这里插入图片描述

(1)是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似的,Webflux 使用
当前一种比较流行响应式编程出现的框架。
在这里插入图片描述
(2)使用传统 web 框架,比如 SpringMVC,这些基于 Servlet 容器,Webflux 是一种异步非阻
塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关 API 实现
的。
(3)解释什么是异步非阻塞

  • 异步和同步
  • 非阻塞和阻塞
    ** 上面都是针对对象不一样
    ** 异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同
    步,如果发送请求之后不等着对方回应就去做其他事情就是异步
    ** 阻塞和非阻塞针对被调用者,被调用者收到请求之后,做完请求任务之后才给出反馈就是阻
    塞,受到请求之后马上给出反馈然后再去做事情就是非阻塞

(4)Webflux 特点:
第一 非阻塞式:在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程
第二 函数式编程:Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求
(5)比较 SpringMVC
在这里插入图片描述
第一 两个框架都可以使用注解方式,都运行在 Tomcat 等容器中
第二 SpringMVC 采用命令式编程,Webflux 采用异步响应式编程

2、响应式编程(Java 实现)

(1)什么是响应式编程

简称RP(Reactive Programming)
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

(2)Java8 及其之前版本

  • 提供的观察者模式两个类 Observer 和 Observable
public class ObserverDemo extends Observable {
    public static void main(String[] args) {
        ObserverDemo observer = new ObserverDemo();
        //添加观察者
        observer.addObserver((o, arg) -> {
            System.out.println("发生变化");
        });
        observer.addObserver((o, arg) -> {
            System.out.println("手动被观察者通知,准备改变");
        });
        observer.setChanged(); //数据变化
        observer.notifyObservers(); //通知
    }
}

java9

Flow

    public static interface Publisher<T> {
        /**
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         *
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
         */
        public void subscribe(Subscriber<? super T> subscriber);
    }
    public static interface Subscriber<T> {
        /**
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         *
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         *
         * @param subscription a new subscription
         */
        public void onSubscribe(Subscription subscription);

        /**
         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         *
         * @param item the item
         */
        public void onNext(T item);

        /**
         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         *
         * @param throwable the exception
         */
        public void onError(Throwable throwable);

        /**
         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
         */
        public void onComplete();
    }

3、响应式编程(Reactor 实现)

(1)响应式编程操作中,Reactor 是满足 Reactive 规范的框架
(2)Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作
符。Flux代表具有0个、1个或多个(可能是无限个)的数据项管道;Mono 是一种特殊的反应式类型,针对数据项不超过一个场景(0 或者 1 个元素),它进行了优化
(3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:
元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉
订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者
在这里插入图片描述

3.1 代码:
引入依赖

<dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-core</artifactId>
 <version>3.1.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>
public static void main(String[] args) {
 //just 方法直接声明:一个或多个对象
 Flux.just(1,2,3,4);
 Mono.just(1);
 //其他的方法
 Integer[] array = {1,2,3,4};
 Flux.fromArray(array);
 
 List<Integer> list = Arrays.asList(array);
 Flux.fromIterable(list);
 Stream<Integer> stream = list.stream();
 Flux.fromStream(stream);
}

@Test
    public void createAFlux_just(){
        Flux<String> fruitFlux = Flux.just("apple", "orange");
        //fruitFlux.subscribe(f->System.out.println(f));

        StepVerifier.create(fruitFlux) //订阅了fruitFlux
                .expectNext("apple")
                .expectNext("orange")
                .verifyComplete();//验证orange之后,整个fruitFlux正常完成

    }

(5)三种信号特点

  • 错误信号和完成信号都是终止信号,不能共存的
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
  • 如果没有错误信号,没有完成信号,表示是无限数据流

(6)调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触
发数据流,不订阅什么都不会发生的
在这里插入图片描述(7)操作符

  • 对数据流进行一道道操作,成为操作符,比如工厂流水线
    第一 map 元素映射为新元素
    在这里插入图片描述
    第二 flatMap 元素映射为流
    把每个元素转换流,把转换之后多个流合并大的流
    在这里插入图片描述

Flux

Flux概述

Flux会尽可能快地发布数据

Flux常用方法

1.创建操作
//根据一个或多个对象创建Flux或Mono
public static <T> Flux<T> just(T data)
public static <T> Flux<T> just(T... data)

//根据数组创建Flux或Mono
public static <T> Flux<T> fromArray(T[] array)

//根据List、Set或其它任意的Iterable的实现
public static <T> Flux<T> fromIterable(Iterable<? extends T> it)

//根据Java Stream
public static <T> Flux<T> fromStream(Stream<? extends T> s)

//创建一个计数器Flux
//range(n, m): n -> n+m-1
public static Flux<Integer> range(int start, int count)

//创建一个以一定时间间隔发布递增值的Flux:从0L开始
//可以使用take方法限制发布的个数
public static Flux<Long> interval(Duration period)
public static Flux<Long> interval(Duration delay, Duration period)

//添加订阅者
//参数可以用Lambda表达式
public final Disposable subscribe(Consumer<? super T> consumer)

2.组合操作
//合并反应式类型
public final Flux<T> mergeWith(Publisher<? extends T> other) 
//它们的消息将会交错合并成一个新的Flux,但不能保证这个顺序


public final Flux<T> delayElements(Duration delay) 
//每delay发布一个条目

public final Flux<T> delaySubscription(Duration delay) 
//延迟订阅此Flux源,直到给定的时间结束。

public static <T1, T2> Flux<Tuple2<T1, T2>> zip(Publisher<? extends T1> source1, 
			Publisher<? extends T2> source2) //每一个元组中都包含 了来自每个源Flux的数据项
			
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
			Publisher<? extends T2> source2,
			final BiFunction<? super T1, ? super T2, ? extends O> combinator)
			
public static <I> Flux<I> first(Publisher<? extends I>... sources) 
//选择第一个发布消息的Flux并只发布该Flux的值

3.转换和过滤反应式流
//过滤
public final Flux<T> skip(long skipped)
//从该Flux的开始跳过指定数量的元素,然后发布剩余的元素

public final Flux<T> skip(Duration timespan)

public final Flux<T> take(long n) 
//与skip相反,只发布前面指定数量的数据项
public final Flux<T> take(Duration timespan)

public final Flux<T> filter(Predicate<? super T> p)
Predicate是一个函数式接口 有一个方法:boolean test(T t); 

public final Flux<T> distinct()
//过滤掉重复的消息

//转换 映射
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)
//将传入的消息转换为新消息,map操作是同步执行的

public final <R> Flux<R> flatMap(Function<? super T, 
		? extends Publisher<? extends R>> mapper)
//

public final Flux<T> subscribeOn(Scheduler scheduler)
//在指定的调度器的Scheduler. worker上运行subscribe、onSubscribe和request,
//声明每个订阅都应该在并行线程中进行,可以异步并行执行

//缓存数据、收集数据
public final Flux<List<T>> buffer(int maxSize) 
public final Flux<List<T>> buffer() //收集所有的数据项 大小为Integer.MAX_VALUE 
//产生一个新包含List的Flux

public final Mono<List<T>> collectList()
//收集所有的数据项, 但产生一个发布List的Mono

public final <K> Mono<Map<K, T>> collectMap(Function<? super T, ? extends K> keyExtractor)
//收集所有的数据项,产生一个发布Map的Mono,key是从传入消息的某些我特征衍生而来
//key相同,后面的消息值会覆盖前面的

public final Mono<T> next() 
//获取第一个数据项到一个新的Mono

4、逻辑操作:测试是否满足某些条件
public final Mono<Boolean> all(Predicate<? super T> predicate)
//测试所有消息都满足某些条件
public final Mono<Boolean> any(Predicate<? super T> predicate)
//确保至少有一个消息满足某些条件


5、其它
public final Flux<T> log()
public final Flux<T> log(String category)
//应用于每个子Flux,记录了所有的反应式事件

Schedulers

Schedulers常用方法

//指定并发模型
public static Scheduler parallel()  
//使用来自固定线程池(大小与cpu核心数量相同)的工作线程

public static Scheduler immediate()
//在当前的线程中执行订阅

public static Scheduler single()
//在一个单一的、可我重用的线程中执行订阅

public static Scheduler newSingle(String name)
public static Scheduler newSingle(String name, boolean daemon)
//针对每个调用,使用专用的线程执行订阅

public static Scheduler elastic()
//创建的线程池的最大数目是不受限制的。
//未使用线程池的默认生存时间是60秒,
//根据需要创建新的工作线程,并销毁空闲的工作线程

4、SpringWebflux 执行流程和核心 API

SpringWebflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻
塞的框架
(1)Netty

  • BIO(Blocking IO)
  • NIO(Non-blocking IO)
    在这里插入图片描述

(2)SpringWebflux 执行过程和 SpringMVC 相似的

  • SpringWebflux 核心控制器 DispatcherHandler,实现接口 WebHandler
  • 接口 WebHandler 有一个方法
    在这里插入图片描述
    在这里插入图片描述

(3)SpringWebflux 里面 DispatcherHandler,负责请求的处理

  • HandlerMapping:请求查询到处理的方法
  • HandlerAdapter:真正负责请求处理
  • HandlerResultHandler:响应结果处理

(4)SpringWebflux 实现函数式编程,两个接口:RouterFunction(路由处理)
和 HandlerFunction(处理函数)

ServerHttpRequest 接口

public interface ServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage
表示响应式服务器端HTTP请求

ServerHttpRequest 常用方法

//返回带有已解析和解码的查询参数值的只读映射
1MultiValueMap<String, String> getQueryParams()

2HttpHeaders getHeaders(); //见HttpMessage

//返回客户端发送的cookie的只读映射
3MultiValueMap<String, HttpCookie> getCookies()

//返回一个构建器来改变该请求的属性
4default ServerHttpRequest.Builder mutate()

MultiValueMap

org.springframework.util
扩展了Map接口,以存储多个值

ServerHttpResponse 接口

表示响应式服务器端HTTP响应

ServerHttpResponse 常用方法

//设置响应的HTTP状态码
1boolean setStatusCode(@Nullable HttpStatus status)
如: response.setStatusCode(HttpStatus.UNAUTHORIZED)

//指示消息处理已完成,允许执行任何清理或处理结束任务,例如将通过
//getHeaders()所做的报头更改应用到底层HTTP消息(如果还没有应用的话)。
//该方法应该在消息处理结束时自动调用,因此应用程序通常不必调用它
2Mono<Void> setComplete()

5、SpringWebflux(基于注解编程模型)

SpringWebflux 实现方式有两种:注解编程模型和函数式编程模型
使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,
SpringBoot 自动配置相关运行容器,默认情况下使用 Netty 服务器
第一步 创建 SpringBoot 工程,引入 Webflux 依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

第二步 配置启动端口号
在这里插入图片描述
代码:

public class User {
    private String name;
    private String gender;
    private Integer age;

    public User(String name, String gender, Integer age) {
        this.name = name;
        this.gender = gender;
        this.age = age;
    }
    //get\set...
 }
@Repository
public class UserServiceImpl implements UserService {
    private final Map<Integer, User> users = new HashMap<>();

    public UserServiceImpl(){
        this.users.put(1, new User("lucy","man",20));
        this.users.put(2, new User("Mary","nv",30));
        this.users.put(3, new User("jack","nv",50));
    }
    @Override
    public Mono<User> getUserById(Integer id) {
        return Mono.just(users.get(id));
    }

    @Override
    public Flux<User> getAllUser() {
        return Flux.fromIterable(users.values());
    }

    @Override
    public Mono<Void> saveUser(Mono<User> userMono) {

        return userMono.doOnNext(person->{
            int id = users.size() + 1;
            users.put(id, person);

        }).thenEmpty(Mono.empty());
    }
}
@RestController
public class UserController {
    @Autowired
    private UserService userService;

    @GetMapping("/user/{id}")
    public Mono<User> getUserById(@PathVariable Integer id) {
        return userService.getUserById(id);
    }

    @GetMapping("/user")
    public Flux<User> getAllUser() {
        return userService.getAllUser();
    }

    @PostMapping("/saveuser")
    public Mono<Void> saveUser(@RequestBody User user){
        Mono<User> userMono = Mono.just(user);
        return userService.saveUser(userMono);
    }
}

说明:
SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat
SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty

6、SpringWebflux(基于函数式编程模型)

(1)在使用函数式编程模型操作时候,需要自己初始化服务器
(2)基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发
给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数
式接口的实现并且启动需要的服务器。
( 3 ) SpringWebflux 请 求 和 响 应 不 再 是 ServletRequest 和 ServletResponse ,而是
ServerRequest 和 ServerResponse
第一步 把注解编程模型工程复制一份 ,保留 entity 和 service 内容
第二步 创建 Handler(具体实现方法)

UserHandler :

public class UserHandler {

    private final UserService userService;
    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    //根据id查询
    public Mono<ServerResponse> getUserById(ServerRequest request) {
        //获取id值
       int userId = Integer.valueOf(request.pathVariable("id"));
       //空值处理
        Mono<ServerResponse> notFound = ServerResponse.notFound().build();

       //调用service方法得到数据
        Mono<User> userMono = this.userService.getUserById(userId);
        //把userMono进行转换返回
        //使用Reactor操作符flatMap
        return
                userMono
                        .flatMap(person -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                                .body(fromObject(person)))
                                .switchIfEmpty(notFound);
    }

    //查询所有
    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        //调用service得到结果
        Flux<User> users = this.userService.getAllUser();
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users,User.class);
    }

    //添加
    public Mono<ServerResponse> saveUser(ServerRequest request) {
        //得到user对象
        Mono<User> userMono = request.bodyToMono(User.class);
        return ServerResponse.ok().build(this.userService.saveUserInfo(userMono));
    }

}

Server:

public class Server {

    public static void main(String[] args) throws Exception{
        Server server = new Server();
        server.createReactorServer();
        System.out.println("enter to exit");
        System.in.read();
    }

    //1 创建Router路由
    public RouterFunction<ServerResponse> routingFunction() {
        //创建hanler对象
        UserService userService = new UserServiceImpl();
        UserHandler handler = new UserHandler(userService);
        //设置路由
        return RouterFunctions.route(
                GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById)
                .andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::getAllUsers);
    }

    //2 创建服务器完成适配
    public void createReactorServer() {
        //路由和handler适配
        RouterFunction<ServerResponse> route = routingFunction();
        HttpHandler httpHandler = toHttpHandler(route);
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
        //创建服务器
        HttpServer httpServer = HttpServer.create();
        httpServer.handle(adapter).bindNow();
    }
}

Client:

public class Client {

    public static void main(String[] args) {
        //调用服务器地址
        WebClient webClient = WebClient.create("http://127.0.0.1:5794");

        //根据id查询
        String id = "1";
        User userresult = webClient.get().uri("/users/{id}", id)
                .accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class)
                .block();
        System.out.println(userresult.getName());

        //查询所有
        Flux<User> results = webClient.get().uri("/users")
                .accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(User.class);

        results.map(stu -> stu.getName())
                    .buffer().doOnNext(System.out::println).blockFirst();
    }
}

RequestPredicate

声明要处理的请求类型
表示对给定的ServerRequest求值的函数。在requestpredicate中可以找到该函数在常见请求属性上进行评估的实例。

RouterFunction

声明如何将请求路由到处理器代码中
public interface RouterFunction<T extends ServerResponse>

ServerRequest 接口

代表一个HTTP请求,包括对请求头和请求体的访问

ServerResponse 接口

代表一个HTTP响应,包括响应头和响应休的信息

ServerResponse 常用方法

HttpStatus 枚举

org.springframework.http

UNAUTHORIZED(401, "Unauthorized")

ServerWebExchange

HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问,并公开其他与服务器端处理相关的属性和特性,如请求属性。

ServerWebExchange 常用方法

//返回当前HTTP请求
ServerHttpRequest getRequest();

//返回当前HTTP响应
ServerHttpResponse getResponse();

//返回当前交互的请求属性的可变映射
Map<String, Object> getAttributes()

7、定义反应式流

反应式流规范可以总结为4个接口:
Publisher、Subscriber、Subscription、Processor

8、使用RxJava类型

Observable

Single

Completable

等价于Mono<Void>

另外,WebFlux还可以返回Flowable,替换Observable和Flux

Mono<T> is a generic type - in your specific situation it represents Void type as Mono<Void>

Mono.empty() - return a Mono that completes without emitting any item.

Logo

ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!

更多推荐