详解Spring 5 Server-Sent Events(三) WebFlux
发表于|更新于
|字数总计:1.1k|阅读时长:4分钟|阅读量:
SSE,全称Server-Sent Events,作为一种半双工的前后端通信方式,由于实现方式简单、轻量,在后端向前端的主动推送场景中具备很好的应用效果,笔者最近在一个项目中多有使用,过程中也是查阅了不少资料和文档,也有所感悟,在此将整体做一次综述。这是一篇系列文章,共分三篇,这是第三篇。
引言
Spring WebFlux framework 5.2.0基于Reactive Streams api,使用事件循环计算模型来实现异步Java Web应用程序。这样的应用程序可以运行在非阻塞的web服务器上,如Netty 4.1和Undertow 1.4,以及Servlet 3.1+容器,如Tomcat8.5和Jetty 9.3。
概述
在WebFlux中实现发送事件,需要以下几步:
@RestController public class ExampleController
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> createConnectionAndSendEvents() { return Flux.just("Alpha", "Omega"); } }
|
如果只发送data
,直接返回Flux(Object)
就可以了,如果是发送带有id
,event
,retry
等字段,那么就要使用构造器来构建一个ServerSentEvent
对象,并放到Flux
里。比如:Flux<ServerSentEvent<T>>
一次性事件
短暂的去发送事件是比较简单的,只需要使用Flux.just()
将消息列表里的消息一条条发送出去即可。
@RestController @RequestMapping("/sse/flux") public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<String> getWords() { return Flux .zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1))) .map(Tuple2::getT1); } }
|
周期性事件
长期的发送事件在发送本身上是没有区别,主要是需要一个周期性线程定期处理发送事务,这里直接使用Flux.inteval()
来轮询。
@RestController @RequestMapping("/sse/flux") public class PerformanceController {
private final PerformanceService performanceService;
PerformanceController(PerformanceService performanceService) { this.performanceService = performanceService; }
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Performance> getPerformance() { return Flux .interval(Duration.ofSeconds(1)) .map(sequence -> performanceService.getPerformance()); } }
|
非周期性事件
非周期性事件可以通过Spring的事件监听接口来实现,关键点在于要把监听消息的处理器和Flux的构造结合起来。
这里面核心的方法是Flux.create()
。他接受两个参数,一个是FluxSink
对象,一个是Flux的溢出策略枚举值。下面是Flux.create()
的一个简单例子,便于理解。
Flux.create(sink -> { sink.next("data1"); sink.next("data2"); sink.complete(); }).subscribe(System.out::println);
|
Flux.create()
:
我们需要做的就是不断的将监听到的事件填充到sink.next()
中去。示例代码如下:
@RestController @RequestMapping("/sse/flux") public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService;
FolderWatchController(FolderWatchService folderWatchService) { this.folderWatchService = folderWatchService; }
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
@PostConstruct void init() { folderWatchService.start(System.getProperty("user.home")); }
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<FolderChangeEvent.Event> getFolderWatch() { return Flux.create(sink -> { MessageHandler handler = message -> sink.next(FolderChangeEvent.class.cast(message.getPayload()).getEvent()); sink.onCancel(() -> subscribableChannel.unsubscribe(handler)); subscribableChannel.subscribe(handler); }, FluxSink.OverflowStrategy.LATEST); }
@Override public void onApplicationEvent(FolderChangeEvent event) { subscribableChannel.send(new GenericMessage<>(event)); } }
|
上面的代码中使用了org.springframework.messagin
包,他可以通过spring-boot-starter-integration
引入。关于这个包的一些基础知识,可以参阅这里。
SSE的注意事项:
Internet Explorer/Edge和许多移动浏览器不支持SSE;尽管可以使用polyfills,但它们可能效率低下
- 在系统设计时,同一个页面最好只维持1个SSE连接,通过事件来区分。因为浏览器对同时并发的连接数有限制,一般最大是6个。