赵志浩
Published on 2025-08-18 / 27 Visits
3
0

SSE 原理和使用

可以先参考这个对 SSE 有基本概念:

  1. https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html

  2. https://juejin.cn/post/7331726568363311115

实现一个 HTTP 工作线程同步阻塞方式的 SSE 很简单,代码直接这样写就行:

    @GetMapping("/test")
    public void test(HttpServletResponse response) {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        try (final PrintWriter writer = response.getWriter()) {
            // 要推送的内容
            final String content = "测试一下 SSE 的推送效果,打字机方式输出文本结果。";
            int len = content.length();
            int endIndex = 0;
            // 每隔2个字符推送一次,模拟打字机效果
            while (endIndex < len) {
                endIndex = Math.min(endIndex + 2, len);
                final String subContent = content.substring(0, endIndex);
                // 将要推送的内容封装成JSON格式,模拟实际开发中的数据格式,非必须
                final JSONObject json = new JSONObject();
                json.put("data", subContent);
                json.put("code", HttpStatus.OK.value());
                // 最后一次推送时,type为finish,表示推送结束,其它情况为add
                final String type = endIndex == len
                        ? "finish"
                        : "add";
                json.put("type", type);
                // 组装成SSE格式的数据,发送给前端,这个格式(data: content\n\n)是固定的,content是自定义的推送内容
                writer.write("data: " + json.toJSONString() + "\n\n");
                writer.flush();
                // 稍微给点停顿,防止数据发送太快,浏览器接收不过来
                TimeUnit.MILLISECONDS.sleep(100);
            }
        } catch (Exception e) {
            Thread.currentThread().interrupt();
            log.error("流式推送数据异常", e);
        }
}

浏览器直接请求该 HTTP 地址,就能看到 SSE 流式输出的效果。

但是这种方式就会有一个问题,Tomcat 调度过来的 HTTP线程就会一直被同步执行,直到当前代码块完全执行完,该 HTTP 线程才会被回收到线程池中。

有没有一种方式实现 WebSocket 的效果呢,WebSocket 效果简单来说就是,连接建立以后该连接会持续被保持打开状态。当有新的消息发送上来后,检测到网络端口有新的消息后,此时 Tomcat 将该消息丢给对应的工作线程池,工作线程池中调度空闲线程,去执行该@Message 方法标记的方法。

简单来说就是 NIO,所有的网络通道保持都注册到对应的轮训器select 上,监听各个通道的变化,只要有消息产生,此时再去调度具体的工作线程去处理。此处 SSE 想要实现这个效果也很简单,只要使用Spring 提供的异步处理机制,它能正确处理异步响应并保持连接开放直到异步任务完成。

代码如下:

private static final Long AUTO_SSE_TIMEOUT = 60 * 60 * 1000L;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
    private static final long HEARTBEAT_INTERVAL = 10_000L; // 10秒心跳间隔

    /**
     * 开启SSE心跳
     * @param emitter
     * @param requestId
     * @return
     */
    private ScheduledFuture<?> startHeartbeat(SseEmitter emitter, String requestId) {
        return executor.scheduleAtFixedRate(() -> {
            try {
                // 发送心跳消息
                log.info("{} send heartbeat", requestId);
                emitter.send("heartbeat");
            }
            catch (Exception e) {
                // 发送心跳失败,关闭连接
                log.error("{} heartbeat failed, closing connection", requestId, e);
                emitter.completeWithError(e);
            }
        }, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
    }

    /**
     * 注册SSE事件
     * @param emitter
     * @param requestId
     * @param heartbeatFuture
     */
    private void registerSSEMonitor(SseEmitter emitter, String requestId, ScheduledFuture<?> heartbeatFuture) {
        // 监听SSE异常事件
        emitter.onCompletion(() -> {
            log.info("{} SSE connection completed normally", requestId);
            heartbeatFuture.cancel(true);
        });

        // 监听连接超时事件
        emitter.onTimeout(() -> {
            log.info("{} SSE connection timed out", requestId);
            heartbeatFuture.cancel(true);
            emitter.complete();
        });

        // 监听连接错误事件
        emitter.onError((ex) -> {
            log.info("{} SSE connection error: ", requestId, ex);
            heartbeatFuture.cancel(true);
            emitter.completeWithError(ex);
        });
    }

    @GetMapping("/asyncTest")
    public SseEmitter asyncTest() {
        log.info("asyncTest thread = >> " + Thread.currentThread());

        String requestId = UUID.randomUUID().toString();

        SseEmitter emitter = new SseEmitter(AUTO_SSE_TIMEOUT);
        // SSE心跳
        ScheduledFuture<?> heartbeatFuture = startHeartbeat(emitter, requestId);
        // 监听SSE事件
        registerSSEMonitor(emitter, requestId, heartbeatFuture);

        CompletableFuture.runAsync(() -> {
            try {
                // 要推送的内容: (SSE 效果推送)
                final String content = "Test the push effect of SSE and output text results in typewriter mode.";
                int len = content.length();
                int endIndex = 0;
                // 每隔2个字符推送一次,模拟打字机效果
                while (endIndex < len) {
                    endIndex = Math.min(endIndex + 2, len);
                    final String subContent = content.substring(0, endIndex);
                    // 将要推送的内容封装成JSON格式,模拟实际开发中的数据格式,非必须
                    final JSONObject json = new JSONObject();
                    json.put("data", subContent);
                    json.put("code", HttpStatus.OK.value());
                    // 最后一次推送时,type为finish,表示推送结束,其它情况为add
                    final String type = endIndex == len
                            ? "finish"
                            : "add";
                    json.put("type", type);
                    // 组装成SSE格式的数据,发送给前端,这个格式(data: content\n\n)是固定的,content是自定义的推送内容
                    emitter.send("data: " + json.toJSONString() + "\n\n");

                    // 稍微给点停顿,防止数据发送太快,浏览器接收不过来
                    TimeUnit.MILLISECONDS.sleep(100);
                }

                emitter.complete();
            }
            catch (Exception e) {
                emitter.completeWithError(e);
                Thread.currentThread().interrupt();
                log.error("流式推送数据异常", e);
            }
        });

        log.info("asyncTest end thread =  >> " + Thread.currentThread());

        //controller 方法立即执行返回,此时该HTTP 工作线程会直接被释放。
        return emitter;
    }
    
    

其实代码本身很简单,只要看asyncTest即可;主要是返回一个SseEmitter,然后使用CompletableFuture.runAsync(),交给另外的线程来执行emitter.send()来推送数据。至于上面的 startHeartbeat 和 registerSSEMonitor方法,则是为了保持 SSE 的完整。

startHeartbeat是将该 SSE 注册到一个定时线程池中,保持 10 秒和客户端有一次心跳交互。registerSSEMonitor方法则是监听Emitter 的各种状态(SSE 异常、超时、连接错误)等状态。

通过上面代码的方式就可以实现一个完整的基于 Spring mvc 的异步 SSE,将具体使用 SSE send 推送数据的线程自行代码中控制。而不阻塞 Tomcat 容器的 HTTP 工作线程,达到支持更多并发的效果。


Comment