原创

StringBoot-SSE和WebFlux方式消息实时推送-默认单向-可增加交互接口

1. 效果演示

file

2. 共同需要

WebMvcConfig.java

package cn.jiangjiesheng.config;

import cn.hutool.json.JSONUtil;
import com.envtoday.ecp.datav.common.ResponseBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.HttpMediaTypeNotAcceptableException;
import org.springframework.web.servlet.HandlerExceptionResolver;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;


@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**");
    }

    //SseWebFluxController方案必要
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setTaskExecutor(mvcTaskExecutor());
        configurer.setDefaultTimeout(30000); // 可选:设置默认超时(毫秒)
    }

    //SseWebFluxController方案必要
    @Bean
    public AsyncTaskExecutor mvcTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("sse-task-");
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    //SseWebFluxController || SseEmitterController方案都必要
    @Override
    public void configureHandlerExceptionResolvers(List<HandlerExceptionResolver> resolvers) {
        resolvers.add((request, response, handler, ex) -> {
            //这是自定义的业务返回类
            ResponseBean<Object> result = new ResponseBean<>();

            // 处理event-stream接口断开时打印"java.io.IOException: 你的主机中的软件中止了一个已建立的连接。"(浏览器窗口关闭或刷新)
            boolean isSseRequest = MediaType.TEXT_EVENT_STREAM_VALUE.equals(request.getHeader("Accept"));
            if (isSseRequest) {
                String msg = ex.getMessage();
                if (msg != null) {
                    msg = msg.toLowerCase();
                    if (msg.contains("reset by peer") ||
                            msg.contains("aborted") ||
                            msg.contains("broken pipe") ||
                            msg.contains("软件中止") ||
                            //后面这两个都是HttpMediaTypeNotAcceptableException 判断,本质上是MediaType.TEXT_EVENT_STREAM_VALUE的头却返回了error页面
                            ex instanceof HttpMediaTypeNotAcceptableException || msg.contains("could not find acceptable representation")
                    ) {
                        return writeResponse(HttpStatus.OK.value(), "沉默处理这个错误", response, result);
                    }
                }
                // 其他 IOException 不处理,交给后续 resolver
                return null;
            } else {
                // 其他特殊处理
                // ex instanceof xxException
            }
            //如果不想有当前方法处理异常,那就返回null
            return writeResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), response, result);
        });

    }

    private ModelAndView writeResponse(int code, String msg, HttpServletResponse response, ResponseBean<Object> result) {
        result.setCode(code);
        result.setMsg(msg);

        //开始写返回
        response.setCharacterEncoding("UTF-8");
        response.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON.toString());
        //实际可以直接写200
        response.setStatus(code == HttpStatus.OK.value() ? HttpStatus.OK.value() : HttpStatus.INTERNAL_SERVER_ERROR.value());
        try {
            response.getWriter().write(JSONUtil.toJsonStr(result));
        } catch (IllegalStateException ignore) {
            // 客户端中断类的不要打印
        } catch (Exception e) {
            System.out.println("responseResult出错了:" + e.getMessage());
        }

        return new ModelAndView();
    }

}

test-sse.html

<!DOCTYPE html>
<html lang="en">
<head>
    <title>Robust SSE Client</title>
    <meta charset="UTF-8">
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 20px;
        }

        #status {
            padding: 10px;
            margin: 10px 0;
            border-radius: 4px;
        }

        .connected {
            background-color: #d4edda;
            color: #155724;
        }

        .disconnected {
            background-color: #f8d7da;
            color: #721c24;
        }

        .reconnecting {
            background-color: #fff3cd;
            color: #856404;
        }

        #messages {
            border: 1px solid #ccc;
            padding: 10px;
            height: 400px;
            overflow-y: auto;
        }

        .message {
            margin: 5px 0;
            padding: 5px;
            background: #f0f0f0;
            border-radius: 3px;
        }
    </style>
</head>
<body>
<h1>📡 Server-Sent Events (SSE) Client</h1>

<div id="status" class="disconnected">Status: Disconnected</div>
<div id="dataStatus" class="connected">dataStatus: 等待连接</div>
<div class="connected">说明: 列表最多加载50条,避免dom结构崩溃,支持滚动列表暂停(列表长度超过窗口),并在滚动到最后或者离开列表区域后加载新数据</div>
<div class="connected">\t 如果需要交互,需要新增一个接口,带上clientId和业务参数</div>
<div id="messages"></div>

<script>
    // 生成客户端唯一标识(可用 UUID,简单示例用时间戳+随机数)
    // 生成客户端唯一标识(可用 UUID,简单示例用时间戳+随机数)
    const clientId = 'client-' + Date.now() + '-' + Math.random().toString(36).slice(2, 11);
    console.log('Client ID:', clientId);
    //const url = urlConstant.url_SSE + `?clientId=${encodeURIComponent(clientId)}`;
    // 创建带 clientId 的连接(和服务端配合)

    // 两种后端方案:
    //const url = `http://192.168.4.143:8803/datav/sse-stream-emitter?clientId=${encodeURIComponent(clientId)}`;
    const url = `http://192.168.4.143:8803/datav/sse-stream-webFlux?clientId=${encodeURIComponent(clientId)}`;

    let eventSource = null;
    let reconnectTimer = null;
    let reconnectAttempts = 0;
    const MAX_RECONNECT_ATTEMPTS = 100; // 最大重试次数(可选)
    const BASE_RECONNECT_DELAY = 3000; // 初始重连延迟(3秒)
    const MAX_RECONNECT_DELAY = 30000; // 最大延迟(30秒)
    const messagesDiv = document.getElementById('messages');
    let isUserInView = false;        // 鼠标是否在 messagesDiv 内
    let isUserManuallyScrolledUp = false; // 用户是否手动向上滚动(未到底)
    // 暂存新消息
    let pendingMessages = [];

    function connect() {
        // 避免重复创建
        if (eventSource) {
            eventSource.close();
        }


        // 显示连接中状态
        updateStatus('Connecting...', 'reconnecting');
        updateDataStatus("等待数据...")

        eventSource = new EventSource(url);

        eventSource.onopen = function (event) {
            console.log('SSE connection opened');
            updateStatus(`Connected [ID: ${clientId}]`, 'connected');
            updateDataStatus("等待连接...")
            reconnectAttempts = 0; // 重置重试计数
        };

        //后端没有指定event,处理 普通数据流,比如日志、聊天消息、实时价格等【本实】
        eventSource.onmessage = function (event) {
            const data = event.data;
            const time = new Date().toLocaleTimeString();

            const message = document.createElement('div');
            message.className = 'message';
            message.textContent = `[${time}] ${data}`;
            addToMessageDiv(message)
        };

        // 后端指定event:处理 特定类型的通知,比如告警、系统事件、用户提醒等
        eventSource.addEventListener('custom-message', function (event) {
            const data = event.data;
            const time = new Date().toLocaleTimeString();

            const message = document.createElement('div');
            message.className = 'message';
            message.style.backgroundColor = '#cce5ff';
            message.textContent = `[${time}] 🔔 Custom: ${data}`;
            addToMessageDiv(message)
        });

        // 监听特定事件类型(如 heartbeat, custom-message)
        eventSource.addEventListener('heartbeat', function (event) {
            //发布时不要打印
            //console.log('SSE Heartbeat received:', event.data);
        });

        eventSource.onerror = function (event) {
            console.error('SSE Error:', event);

            // readyState: 0=CONNECTING, 1=OPEN, 2=CLOSED
            if (eventSource.readyState === EventSource.CLOSED) {
                console.log('Connection closed. Attempting to reconnect...');
                scheduleReconnect();
            } else if (eventSource.readyState === EventSource.CONNECTING) {
                updateStatus('Reconnecting...', 'reconnecting');
            }
        };
        //当鼠标在messagesDiv 时,就不要滚动
        initMouseEnterOrScroll();
    }

    function scheduleReconnect() {
        if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
            updateStatus('Max retries reached. Stopped.', 'disconnected');
            return;
        }

        reconnectAttempts++;
        const delay = Math.min(BASE_RECONNECT_DELAY * Math.pow(1.5, reconnectAttempts), MAX_RECONNECT_DELAY);

        updateStatus(`Reconnecting in ${delay / 1000}s... (Attempt ${reconnectAttempts})`, 'reconnecting');

        clearTimeout(reconnectTimer);
        reconnectTimer = setTimeout(() => {
            connect();
        }, delay);
    }

    function updateStatus(text, className) {
        const statusDiv = document.getElementById('status');
        statusDiv.textContent = 'Status: ' + text;
        statusDiv.className = ''; // 清除旧类
        statusDiv.classList.add(className);
    }

    function updateDataStatus(text) {
        const dataStatusDiv = document.getElementById('dataStatus');
        dataStatusDiv.textContent = 'dataStatus: ' + text;
    }

    // 页面加载完成后连接
    // 通用跨浏览器事件绑定工具
    function on(element, event, handler) {
        if (element.addEventListener) {
            element.addEventListener(event, handler, false);
        } else if (element.attachEvent) {
            element.attachEvent('on' + event, handler);
        }
    }

    //当鼠标在messagesDiv 时,就不要滚动
    function initMouseEnterOrScroll() {
        if (!messagesDiv) {
            console.error('未找到 id 为 "messages" 的元素');
        }

        // 使用 on() 绑定事件,兼容老浏览器
        if (messagesDiv) {
            on(messagesDiv, 'mouseenter', function () {
                isUserInView = true;
                console.log("🖱️ 进入消息区域");
                updateDataStatus("鼠标进入消息区域,暂定滚动,缓存消息")
            });

            on(messagesDiv, 'mouseleave', function () {
                isUserInView = false;
                console.log("🖱️ 离开消息区域");
                updateDataStatus("鼠标离开消息区域,恢复滚动,释放消息")
                // 如果离开时有暂存消息,立即释放
                if (pendingMessages.length > 0) {
                    flushPendingMessages();
                }
            });

            // 可选:监听滚动,智能判断是否恢复自动滚动
            on(messagesDiv, 'scroll', function () {
                updateScrollState();

                // 如果回到底部,释放暂存消息
                if (!isUserManuallyScrolledUp && pendingMessages.length > 0) {
                    flushPendingMessages();
                }
            });

            // ========== 初始化 ==========
            updateScrollState();
        }

    }

    function addToMessageDiv(message) {

        // 🔴 关键判断:只要用户在区域内 且 不在底部,就暂存
        if (isUserInView && isUserManuallyScrolledUp) {
            console.log("📌 消息暂存:用户在查看历史");
            updateDataStatus("消息暂存:用户在查看历史")
            pendingMessages.push(message);
            //showNewMessageBadge(pendingMessages.length);
            return;
        }

        // 添加新消息
        messagesDiv.appendChild(message);
        console.log("✅ 直接添加消息");
        updateDataStatus("直接添加消息")
        keepMaxRowInMessageDiv();
        // 滚动到底部
        messagesDiv.scrollTop = messagesDiv.scrollHeight;
    }


    function keepMaxRowInMessageDiv() {
        // 限制最大消息数量为 50 避免页面无限堆积、影响性能
        const maxMessages = 50;
        const messageElements = messagesDiv.querySelectorAll('.message');
        if (messageElements.length > maxMessages) {
            // 移除最早的一条(即第一个 .message 元素)
            messagesDiv.removeChild(messageElements[0]);
        }
    }


    // ========== 滚动判断函数 ==========
    function updateScrollState() {
        const isAtBottom =
            messagesDiv.scrollHeight - messagesDiv.clientHeight <= messagesDiv.scrollTop + 10;
        isUserManuallyScrolledUp = !isAtBottom;
    }


    // ========== 释放所有暂存消息 ==========
    function flushPendingMessages() {
        if (pendingMessages.length === 0) return;

        console.log(`⬇️ 释放 ${pendingMessages.length} 条暂存消息`);
        updateDataStatus(`释放 ${pendingMessages.length} 条暂存消息`)
        pendingMessages.forEach(msg => {
            messagesDiv.appendChild(msg);
            keepMaxRowInMessageDiv();
        });

        // 清空暂存
        pendingMessages = [];
        //hideNewMessageBadge();

        // 强制滚动到底部
        messagesDiv.scrollTop = messagesDiv.scrollHeight;
    }


    // 页面加载完成后连接
    on(window, 'load', function () {
        connect();
    });

    // 页面卸载时关闭连接
    on(window, 'beforeunload', function () {
        if (eventSource) {
            eventSource.close();
        }
        if (reconnectTimer) {
            clearTimeout(reconnectTimer);
        }
    });
</script>
</body>
</html>

3. 两种方案

3.1 webflux+reactor

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.0</version>
</dependency>

SseWebFluxController.java

package cn.jiangjiesheng.controller;

import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;

import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@RestController
public class SseWebFluxController {

    // 使用 ConcurrentHashMap 存储每个客户端的标识和其 FluxSink(可选:用于反向推送)
    private final Map<String, Sinks.Many<ServerSentEvent<String>>> clientSinks = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(10); // 可根据负载调整线程数
    private static volatile boolean hasInvokeMock = false;

    // /datav/sse-stream

    @GetMapping(value = "/sse-stream-webFlux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> streamEvents(@RequestParam String clientId) {
        // 最多缓存 100 条消息 防止浏览器挤压
        Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer(100);
        // 注册 sink
        clientSinks.put(clientId, sink);

        // 定时数据流
        Flux<ServerSentEvent<String>> intervalFlux = Flux.interval(Duration.ofSeconds(5))
                //.takeWhile(ignored -> clientSinks.containsKey(clientId))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(clientId + "-" + sequence)
                        .data("定时数据流模拟[无event]: SSE from WebFlux - " + LocalDateTime.now() + " [Client: " + clientId + "]")
                        .build());

        // 心跳
        Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(5))
                //.takeWhile(ignored -> clientSinks.containsKey(clientId))
                .map(seq -> ServerSentEvent.<String>builder()
                        .event("heartbeat")
                        .data("ping")
                        .build());

        // 合并流
        Flux<ServerSentEvent<String>> serverSentEventFlux = Flux.merge(intervalFlux, heartbeat, sink.asFlux())
                .doOnSubscribe(sub -> System.out.println("✅ 客户端连接: " + clientId))
                .doOnCancel(() -> {
                    String threadName = Thread.currentThread().getName();
                    System.out.println("🔌 客户端取消连接: " + clientId + " | 线程: " + threadName);

                    // 判断是否为 sse-task 线程
//                    if (threadName.startsWith("sse-task-")) {
//                        System.out.println("⚠️ 取消发生在 sse-task 线程!可能并发清理!");
//                    } else if (threadName.startsWith("http-nio-")) {
//                        System.out.println("✅ 取消发生在请求线程,安全。");
//                    }

                    //cleanupClient(clientId);
                })
                .doFinally(signalType -> {
                    String threadName = Thread.currentThread().getName();
                    System.out.println("🛑 doFinally 触发 [信号: " + signalType + "] | 客户端: " + clientId + " | 线程: " + threadName);

                    // signalType 可能是:
                    // - ON_COMPLETE
                    // - ON_ERROR
                    // - CANCEL
                    if (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR || signalType == SignalType.ON_COMPLETE) {
                        cleanupClient(clientId);
                    }
                })
                .doOnTerminate(() -> {
                    System.out.println("🛑 流终止: " + clientId);
                    cleanupClient(clientId);
                })
                .doOnError(err -> {
                    System.err.println("❌ 流错误 for " + clientId + ": " + err.getMessage());
                    cleanupClient(clientId);
                })
                .onErrorResume(throwable -> Flux.empty());

        //模拟定时任务
        mockSendBizDate();
        System.out.println("🧹 清理客户端资源: 当前线程:" + Thread.currentThread().getName() + "]-" + clientId);
        return serverSentEventFlux;

    }

    private void mockSendBizDate() {
        if (!hasInvokeMock) {
            synchronized (SseWebFluxController.class) {
                if (!hasInvokeMock) {
                    scheduler.scheduleWithFixedDelay(() -> {
                        // 创建副本,避免遍历时修改 map
                        List<String> clients = new ArrayList<>(clientSinks.keySet());
                        for (String clientId : clients) {
                            try {
                                pushMessageToClient(clientId, "custom-message", "具体业务event mock " + LocalDateTime.now() + " [Client: " + clientId + "]");
                            } catch (Exception e) {
                                System.err.println("定时任务推送异常: " + e.getMessage());
                                // pushMessageToClient 内部会处理清理
                            }
                        }
                    }, 1, 2, TimeUnit.SECONDS);
                    hasInvokeMock = true;
                }
            }
        }
    }


    private void cleanupClient(String clientId) {
        Sinks.Many<ServerSentEvent<String>> removed = clientSinks.remove(clientId);
        if (removed != null) {
            removed.tryEmitComplete(); // 通知 sink 完成
            System.out.println("🧹 清理客户端资源: 当前线程:[" + Thread.currentThread().getName() + "]-" + clientId);
        }
    }

    // 示例:外部服务可以调用此方法向指定客户端推送消息
    public void pushMessageToClient(String clientId, String event, String eventData) {
        Sinks.Many<ServerSentEvent<String>> sink = clientSinks.get(clientId);
        if (sink == null) {
            System.out.println("⏭️ 客户端不存在或已断开: " + clientId);
            return;
        }
        Sinks.EmitResult result = sink.tryEmitNext(
                ServerSentEvent.<String>builder()
                        .event(event)
                        .data(eventData)
                        .build()
        );
        switch (result) {
            case OK:
                // 发送成功
                break;
            case FAIL_TERMINATED:
            case FAIL_CANCELLED:
            case FAIL_ZERO_SUBSCRIBER:
                // 这些状态表明 sink 已无效,应清理
                System.err.println("❌ 推送失败 [" + clientId + "],原因: " + result + ",正在清理");
                cleanupClient(clientId);
                break;
            case FAIL_OVERFLOW:
                // 缓冲区满,可能是客户端慢或断开,可尝试清理或降级
                System.err.println("⚠️ 推送失败 [" + clientId + "]:缓冲区溢出 (FAIL_OVERFLOW)");
                // 可选:延迟后重试,或直接清理(保守做法)
                // 这里我们选择清理,防止堆积
                cleanupClient(clientId);
                break;
            case FAIL_NON_SERIALIZED:
                // 非法并发调用,属于代码 bug
                System.err.println("🚨 非法并发调用 tryEmitNext: " + result);
                // 不清理 clientSinks,但应修复调用方线程安全问题
                break;
            default:
                // 预防未来新增枚举值
                System.err.println("❓ 未知 EmitResult: " + result);
                break;
        }
    }

    @PreDestroy
    public void shutdown() {
        // 清理所有客户端
        new ArrayList<>(clientSinks.keySet()).forEach(this::cleanupClient);

        // 关闭线程池
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

}

3.1 Emitter

SseEmitterController.java

package com.envtoday.ecp.datav.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@RestController
public class SseEmitterController {

    // 存储每个客户端的 SseEmitter(用于反向推送)
    private final Map<String, SseEmitter> clientEmitters = new ConcurrentHashMap<>();
    // 全局共享的调度器,避免每个连接创建线程池
    private final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(3); // 可根据负载调整线程数

    // 防止重复启动定时任务
    private static volatile boolean hasInvokeMock = false;

    /**
     * SSE 流接口
     */
    @GetMapping(value = "/sse-stream-emitter", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamEvents(@RequestParam String clientId) throws IOException {
        // 创建 SseEmitter,设置超时时间(可选,0 表示永不过期)
        SseEmitter emitter = new SseEmitter(0L); // 0 = no timeout

        // 注册 emitter
        clientEmitters.put(clientId, emitter);

        // 设置生命周期回调(都在 http-nio 线程执行,安全)
        emitter.onCompletion(() -> {
            System.out.println("✅ onCompletion: 客户端断开或完成 | 客户端: " + clientId);
            cleanupClient(clientId);
        });

        emitter.onTimeout(() -> {
            System.out.println("⏰ onTimeout: 客户端连接超时 | 客户端: " + clientId);
            cleanupClient(clientId);
            emitter.complete();
        });

        emitter.onError(throwable -> {
            // Windows 系统常见断开提示:"你的主机中的软件中止了一个已建立的连接。"
            // Linux/Unix 常见:"Connection reset by peer" 或 "Broken pipe"
            String msg = throwable.getMessage();
            if (msg != null && (
                    msg.contains("中止了一个已建立的连接") ||
                            msg.contains("Connection reset by peer") ||
                            msg.contains("Broken pipe") ||
                            msg.contains("Connection timed out"))) {
                // 视为正常断开,不打印错误
            } else {
                // 其他异常,可以打印警告
                System.err.println("⚠️ SSE 连接异常 [非网络断开]: " + clientId + " | " + throwable.getMessage());
            }
            cleanupClient(clientId);
//            emitter.complete();
        });

        // 发送初始连接消息
        try {
            emitter.send(SseEmitter.event()
                    .name("connected")
                    .data("已连接到 SSE 服务 [Client: " + clientId + "] at " + LocalDateTime.now()));
        } catch (IOException e) {
            // 客户端可能已断开
            emitter.complete();
            cleanupClient(clientId);
            return emitter;
        }

        // 启动定时任务(仅首次调用)
        mockSendBizDate();

        // 启动内置定时数据流(每 5 秒)
        startIntervalData(clientId, emitter);

        // 启动心跳(每 5 秒)
        startHeartbeat(clientId, emitter);

        System.out.println("✅ 客户端连接成功: " + clientId);
        return emitter;
    }

    /**
     * 启动每 5 秒的定时数据流
     */
    private void startIntervalData(String clientId, SseEmitter emitter) {
        scheduler.scheduleAtFixedRate(() -> {
            if (!clientEmitters.containsKey(clientId)) {
                return;
            }
            // 2. 检查 emitter 是否已过期或连接断开
            // 方法:尝试 send 一个空数据,看是否抛出异常,或者通过 onCompletion/onError 已触发
            // 但更主动的方式是:我们无法直接获取状态,但可以封装判断逻辑
            try {
                emitter.send(SseEmitter.event()
                        .id(clientId + "-" + System.currentTimeMillis())
                        .data("定时数据流模拟[无event]: SSE from SseEmitter - " + LocalDateTime.now() + " [Client: " + clientId + "]"));
            } catch (IOException e) {
                // 客户端断开连接是正常行为,不打印错误日志
                // 可选:只在 DEBUG 级别打印
                // log.debug("客户端 {} 连接中断,准备清理", clientId);
                cleanupClient(clientId);
            }
        }, 0, 5, TimeUnit.SECONDS);

       // emitter.onCompletion(intervalScheduler::shutdown);
    }

    /**
     * 启动每 5 秒的心跳
     */
    private void startHeartbeat(String clientId, SseEmitter emitter) {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {
            if (!clientEmitters.containsKey(clientId)) {
                //heartbeatScheduler.shutdown();
                return;
            }
            try {
                emitter.send(SseEmitter.event()
                        .name("heartbeat")
                        .data("ping"));
            } catch (IOException e) {
                // 静默处理断开连接
                cleanupClient(clientId);
               // heartbeatScheduler.shutdown();
            }
        }, 5, 5, TimeUnit.SECONDS);

       // emitter.onCompletion(heartbeatScheduler::shutdown);
    }

    /**
     * 模拟定时业务消息推送(每 2 秒)
     */
    private void mockSendBizDate() {
        if (!hasInvokeMock) {
            synchronized (SseEmitterController.class) {
                if (!hasInvokeMock) {
                    scheduler.scheduleWithFixedDelay(() -> {
                        List<String> clients = new ArrayList<>(clientEmitters.keySet());
                        for (String clientId : clients) {
                            try {
                                pushMessageToClient(clientId, "custom-message", "具体业务event mock " + LocalDateTime.now() + " [Client: " + clientId + "]");
                            } catch (Exception e) {
                                System.err.println("定时任务推送异常: " + e.getMessage());
                            }
                        }
                    }, 1, 2, TimeUnit.SECONDS);
                    hasInvokeMock = true;
                }
            }
        }
    }

    /**
     * 清理客户端资源
     */
    private void cleanupClient(String clientId) {
        SseEmitter removed = clientEmitters.remove(clientId);
        if (removed != null) {
            try {
                removed.complete(); // 确保关闭连接
            } catch (Exception ignored) {}
            System.out.println("🧹 已清理客户端资源: " + clientId + " | 当前线程: [" + Thread.currentThread().getName() + "],时间:"+new Date());
        }

    }

    /**
     * 外部服务可调用此方法向指定客户端推送消息
     */
    public void pushMessageToClient(String clientId, String event, String eventData) {
        SseEmitter emitter = clientEmitters.get(clientId);
        if (emitter == null) {
            // 客户端已断开,不打印日志(避免刷屏)
            return;
        }
        try {
            emitter.send(SseEmitter.event()
                    .name(event)
                    .data(eventData));
        } catch (IOException e) {
            // 客户端连接已断开,静默清理
            // System.out.println(e);
            cleanupClient(clientId);
        }
    }

    // ========================
    // 可选:提供管理接口
    // ========================

    /**
     * 获取当前连接的客户端数(用于监控)
     */
    public int getClientCount() {
        return clientEmitters.size();
    }

    /**
     * 广播消息给所有客户端
     */
    public void broadcast(String event, String data) {
        List<String> failedClients = new ArrayList<>();
        for (Map.Entry<String, SseEmitter> entry : clientEmitters.entrySet()) {
            try {
                entry.getValue().send(SseEmitter.event().name(event).data(data));
            } catch (IOException e) {
                failedClients.add(entry.getKey());
            }
        }
        // 清理失败的客户端
        failedClients.forEach(clientId -> {
            System.err.println("广播失败,清理客户端: " + clientId);
            cleanupClient(clientId);
        });
    }

    @PreDestroy
    public void shutdown() {
        // 清理所有客户端
        new ArrayList<>(clientEmitters.keySet()).forEach(this::cleanupClient);

        // 关闭线程池
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}
正文到此结束
本文目录