SSE,全称Server-Sent Events,作为一种半双工的前后端通信方式,由于实现方式简单、轻量,在后端向前端的主动推送场景中具备很好的应用效果,笔者最近在一个项目中多有使用,过程中也是查阅了不少资料和文档,也有所感悟,在此将整体做一次综述。这是一篇系列文章,共分三篇,这是第二篇。

引言

Spring WebMVC Framework 5.2.0基于Servlet 3.1,需要中间件能支持Servlet 3.1 API,比如**Tomcat 8.5 **和 Jetty 9.3

概述

WebMVC中实现发送事件,需要以下几步:

  • 创建一个controller类并用@RestController注释标记它

  • 创建一个接受Http GET请求的方法,该方法返回一个SseEmitter对象

  • 在另一个线程中,获取这个SseEmitter实例,并调用SseEmitter.send()发送事件。

  • 调用SseEmitter.complete()正常关闭一个SSE连接,调用SseEmitter.completeWithError()关闭连接的同时向前台抛出一个错误。

@RestController
public class SseWebMvcController

private SseEmitter emitter;

@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter createConnection() {
emitter = new SseEmitter();
return emitter;
}

// in another thread
void sendEvents() {
try {
emitter.send("Alpha");
emitter.send("Omega");

emitter.complete();
} catch(Exception e) {
emitter.completeWithError(e);
}
}
}

如果只发送data,调用SseEmitter.send()即可,如果是向发送带有id,event,retry等字段,那么就要使用构造器来构建一个事件对象: SseEmitter.send(SseEmitter.SseEventBuilder builder)

如果要实现广播,可以将emitter对象保存到一个线程安全的列表中,然后遍历发送。

class SseEmitters {

private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();

SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);

emitter.onCompletion(() -> {
this.emitters.remove(emitter);
});
emitter.onTimeout(() -> {
emitter.complete();
this.emitters.remove(emitter);
});

return emitter;
}

void send(Object obj) {
List<SseEmitter> failedEmitters = new ArrayList<>();

this.emitters.forEach(emitter -> {
try {
emitter.send(obj);
} catch (Exception e) {
emitter.completeWithError(e);
failedEmitters.add(emitter);
}
});

this.emitters.removeAll(failedEmitters);
}
}

一次性事件

短暂的去发送事件是比较简单的,只需要去启动一个线程执行发送即可,发完关闭连接。

@Controller
@RequestMapping("/sse/mvc")
public class WordsController {

private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");

private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getWords() {
SseEmitter emitter = new SseEmitter();

cachedThreadPool.execute(() -> {
try {
for (int i = 0; i < WORDS.length; i++) {
emitter.send(WORDS[i]);
TimeUnit.SECONDS.sleep(1);
}

emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});

return emitter;
}
}

周期性事件

长期的发送事件在发送本身上是没有区别,主要是需要一个周期性线程定期处理发送事务。

@RestController
@RequestMapping("/sse/mvc")
public class PerformanceController {

private final PerformanceService performanceService;

PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
}

private final AtomicInteger id = new AtomicInteger();

private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);

private final SseEmitters emitters = new SseEmitters();

@PostConstruct
void init() {
scheduledThreadPool.scheduleAtFixedRate(() -> {
emitters.send(performanceService.getPerformance());
}, 0, 1, TimeUnit.SECONDS);
}

@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getPerformance() {
return emitters.add();
}
}

非周期性事件

非周期性事件可以通过Spring的事件监听接口来实现,即在事件监听器里调用**send()**方法。下面是简化写法,事件监听器可以参阅这里

@RestController
@RequestMapping("/sse/mvc")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {

private final FolderWatchService folderWatchService;

FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
}

private final SseEmitters emitters = new SseEmitters();

@PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
}

@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getFolderWatch() {
return emitters.add(new SseEmitter());
}

@Override
public void onApplicationEvent(FolderChangeEvent event) {
emitters.send(event.getEvent());
}
}