SSE,全称Server-Sent Events,作为一种半双工的前后端通信方式,由于实现方式简单、轻量,在后端向前端的主动推送场景中具备很好的应用效果,笔者最近在一个项目中多有使用,过程中也是查阅了不少资料和文档,也有所感悟,在此将整体做一次综述。这是一篇系列文章,共分三篇,这是第三篇。

引言

Spring WebFlux framework 5.2.0基于Reactive Streams api,使用事件循环计算模型来实现异步Java Web应用程序。这样的应用程序可以运行在非阻塞的web服务器上,如Netty 4.1Undertow 1.4,以及Servlet 3.1+容器,如Tomcat8.5Jetty 9.3

概述

WebFlux中实现发送事件,需要以下几步:

  • 创建一个controller类并用@RestController注释标记它

  • 创建一个接受Http GET请求的方法,该方法返回一个Flux对象,并配置produces=text/event-stream

@RestController
public class ExampleController

@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnectionAndSendEvents() {
return Flux.just("Alpha", "Omega");
}
}

如果只发送data,直接返回Flux(Object)就可以了,如果是发送带有id,event,retry等字段,那么就要使用构造器来构建一个ServerSentEvent对象,并放到Flux里。比如:Flux<ServerSentEvent<T>>

一次性事件

短暂的去发送事件是比较简单的,只需要使用Flux.just()将消息列表里的消息一条条发送出去即可。

@RestController
@RequestMapping("/sse/flux")
public class WordsController {

private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");

@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> getWords() {
return Flux
.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1);
}
}

周期性事件

长期的发送事件在发送本身上是没有区别,主要是需要一个周期性线程定期处理发送事务,这里直接使用Flux.inteval()来轮询。

@RestController
@RequestMapping("/sse/flux")
public class PerformanceController {

private final PerformanceService performanceService;

PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
}

@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Performance> getPerformance() {
return Flux
.interval(Duration.ofSeconds(1))
.map(sequence -> performanceService.getPerformance());
}
}

非周期性事件

非周期性事件可以通过Spring的事件监听接口来实现,关键点在于要把监听消息的处理器和Flux的构造结合起来。

这里面核心的方法是Flux.create()。他接受两个参数,一个是FluxSink对象,一个是Flux的溢出策略枚举值。下面是Flux.create()的一个简单例子,便于理解。

Flux.create(sink -> {
//向下游发布元素
sink.next("data1");
sink.next("data2");
//结束发布元素
sink.complete();
}).subscribe(System.out::println);//subscribe发布消息,System.out.println为消费者,消费消息;

Flux.create():

我们需要做的就是不断的将监听到的事件填充到sink.next()中去。示例代码如下:

@RestController
@RequestMapping("/sse/flux")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {

private final FolderWatchService folderWatchService;

FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
}

private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get(); //初始化一个频道

@PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
}

@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<FolderChangeEvent.Event> getFolderWatch() {
return Flux.create(sink -> {
MessageHandler handler = message -> sink.next(FolderChangeEvent.class.cast(message.getPayload()).getEvent());
sink.onCancel(() -> subscribableChannel.unsubscribe(handler));
subscribableChannel.subscribe(handler);//订阅频道
}, FluxSink.OverflowStrategy.LATEST);
}

@Override
public void onApplicationEvent(FolderChangeEvent event) {
subscribableChannel.send(new GenericMessage<>(event));//向频道内发送事件
}
}

上面的代码中使用了org.springframework.messagin包,他可以通过spring-boot-starter-integration引入。关于这个包的一些基础知识,可以参阅这里

SSE的注意事项:

  • 只适合发送文本消息;尽管可以使用Base64编码和gzip压缩来发送二进制消息,但效率可能很低。

  • 早期的一些浏览器,如Internet Explorer不支持。

Internet Explorer/Edge和许多移动浏览器不支持SSE;尽管可以使用polyfills,但它们可能效率低下

  • 在系统设计时,同一个页面最好只维持1个SSE连接,通过事件来区分。因为浏览器对同时并发的连接数有限制,一般最大是6个。