详解Spring 5 Server-Sent Events(二) WebMVC
发表于|更新于
|字数总计:898|阅读时长:4分钟|阅读量:
SSE,全称Server-Sent Events,作为一种半双工的前后端通信方式,由于实现方式简单、轻量,在后端向前端的主动推送场景中具备很好的应用效果,笔者最近在一个项目中多有使用,过程中也是查阅了不少资料和文档,也有所感悟,在此将整体做一次综述。这是一篇系列文章,共分三篇,这是第二篇。
引言
Spring WebMVC Framework 5.2.0基于Servlet 3.1,需要中间件能支持Servlet 3.1 API,比如**Tomcat 8.5 **和 Jetty 9.3。
概述
在WebMVC中实现发送事件,需要以下几步:
创建一个controller
类并用@RestController
注释标记它
创建一个接受Http GET
请求的方法,该方法返回一个SseEmitter对象
在另一个线程中,获取这个SseEmitter实例,并调用SseEmitter.send()
发送事件。
调用SseEmitter.complete()
正常关闭一个SSE连接,调用SseEmitter.completeWithError()
关闭连接的同时向前台抛出一个错误。
@RestController public class SseWebMvcController
private SseEmitter emitter;
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE) SseEmitter createConnection() { emitter = new SseEmitter(); return emitter; }
void sendEvents() { try { emitter.send("Alpha"); emitter.send("Omega");
emitter.complete(); } catch(Exception e) { emitter.completeWithError(e); } } }
|
如果只发送data
,调用SseEmitter.send()
即可,如果是向发送带有id
,event
,retry
等字段,那么就要使用构造器来构建一个事件对象: SseEmitter.send(SseEmitter.SseEventBuilder builder)
如果要实现广播,可以将emitter
对象保存到一个线程安全的列表中,然后遍历发送。
class SseEmitters {
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
SseEmitter add(SseEmitter emitter) { this.emitters.add(emitter);
emitter.onCompletion(() -> { this.emitters.remove(emitter); }); emitter.onTimeout(() -> { emitter.complete(); this.emitters.remove(emitter); });
return emitter; }
void send(Object obj) { List<SseEmitter> failedEmitters = new ArrayList<>();
this.emitters.forEach(emitter -> { try { emitter.send(obj); } catch (Exception e) { emitter.completeWithError(e); failedEmitters.add(emitter); } });
this.emitters.removeAll(failedEmitters); } }
|
一次性事件
短暂的去发送事件是比较简单的,只需要去启动一个线程执行发送即可,发完关闭连接。
@Controller @RequestMapping("/sse/mvc") public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE) SseEmitter getWords() { SseEmitter emitter = new SseEmitter();
cachedThreadPool.execute(() -> { try { for (int i = 0; i < WORDS.length; i++) { emitter.send(WORDS[i]); TimeUnit.SECONDS.sleep(1); }
emitter.complete(); } catch (Exception e) { emitter.completeWithError(e); } });
return emitter; } }
|
周期性事件
长期的发送事件在发送本身上是没有区别,主要是需要一个周期性线程定期处理发送事务。
@RestController @RequestMapping("/sse/mvc") public class PerformanceController {
private final PerformanceService performanceService;
PerformanceController(PerformanceService performanceService) { this.performanceService = performanceService; }
private final AtomicInteger id = new AtomicInteger();
private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
private final SseEmitters emitters = new SseEmitters();
@PostConstruct void init() { scheduledThreadPool.scheduleAtFixedRate(() -> { emitters.send(performanceService.getPerformance()); }, 0, 1, TimeUnit.SECONDS); }
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE) SseEmitter getPerformance() { return emitters.add(); } }
|
非周期性事件
非周期性事件可以通过Spring的事件监听接口来实现,即在事件监听器里调用**send()**方法。下面是简化写法,事件监听器可以参阅这里。
@RestController @RequestMapping("/sse/mvc") public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService;
FolderWatchController(FolderWatchService folderWatchService) { this.folderWatchService = folderWatchService; }
private final SseEmitters emitters = new SseEmitters();
@PostConstruct void init() { folderWatchService.start(System.getProperty("user.home")); }
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE) SseEmitter getFolderWatch() { return emitters.add(new SseEmitter()); }
@Override public void onApplicationEvent(FolderChangeEvent event) { emitters.send(event.getEvent()); } }
|