StringBoot-SSE和WebFlux方式消息实时推送-默认单向-可增加交互接口
1. 效果演示
2. 共同需要
WebMvcConfig.java
package cn.jiangjiesheng.config;
import cn.hutool.json.JSONUtil;
import com.envtoday.ecp.datav.common.ResponseBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.HttpMediaTypeNotAcceptableException;
import org.springframework.web.servlet.HandlerExceptionResolver;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**");
}
//SseWebFluxController方案必要
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(mvcTaskExecutor());
configurer.setDefaultTimeout(30000); // 可选:设置默认超时(毫秒)
}
//SseWebFluxController方案必要
@Bean
public AsyncTaskExecutor mvcTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("sse-task-");
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
//SseWebFluxController || SseEmitterController方案都必要
@Override
public void configureHandlerExceptionResolvers(List<HandlerExceptionResolver> resolvers) {
resolvers.add((request, response, handler, ex) -> {
//这是自定义的业务返回类
ResponseBean<Object> result = new ResponseBean<>();
// 处理event-stream接口断开时打印"java.io.IOException: 你的主机中的软件中止了一个已建立的连接。"(浏览器窗口关闭或刷新)
boolean isSseRequest = MediaType.TEXT_EVENT_STREAM_VALUE.equals(request.getHeader("Accept"));
if (isSseRequest) {
String msg = ex.getMessage();
if (msg != null) {
msg = msg.toLowerCase();
if (msg.contains("reset by peer") ||
msg.contains("aborted") ||
msg.contains("broken pipe") ||
msg.contains("软件中止") ||
//后面这两个都是HttpMediaTypeNotAcceptableException 判断,本质上是MediaType.TEXT_EVENT_STREAM_VALUE的头却返回了error页面
ex instanceof HttpMediaTypeNotAcceptableException || msg.contains("could not find acceptable representation")
) {
return writeResponse(HttpStatus.OK.value(), "沉默处理这个错误", response, result);
}
}
// 其他 IOException 不处理,交给后续 resolver
return null;
} else {
// 其他特殊处理
// ex instanceof xxException
}
//如果不想有当前方法处理异常,那就返回null
return writeResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), response, result);
});
}
private ModelAndView writeResponse(int code, String msg, HttpServletResponse response, ResponseBean<Object> result) {
result.setCode(code);
result.setMsg(msg);
//开始写返回
response.setCharacterEncoding("UTF-8");
response.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON.toString());
//实际可以直接写200
response.setStatus(code == HttpStatus.OK.value() ? HttpStatus.OK.value() : HttpStatus.INTERNAL_SERVER_ERROR.value());
try {
response.getWriter().write(JSONUtil.toJsonStr(result));
} catch (IllegalStateException ignore) {
// 客户端中断类的不要打印
} catch (Exception e) {
System.out.println("responseResult出错了:" + e.getMessage());
}
return new ModelAndView();
}
}
test-sse.html
<!DOCTYPE html>
<html lang="en">
<head>
<title>Robust SSE Client</title>
<meta charset="UTF-8">
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
#status {
padding: 10px;
margin: 10px 0;
border-radius: 4px;
}
.connected {
background-color: #d4edda;
color: #155724;
}
.disconnected {
background-color: #f8d7da;
color: #721c24;
}
.reconnecting {
background-color: #fff3cd;
color: #856404;
}
#messages {
border: 1px solid #ccc;
padding: 10px;
height: 400px;
overflow-y: auto;
}
.message {
margin: 5px 0;
padding: 5px;
background: #f0f0f0;
border-radius: 3px;
}
</style>
</head>
<body>
<h1>📡 Server-Sent Events (SSE) Client</h1>
<div id="status" class="disconnected">Status: Disconnected</div>
<div id="dataStatus" class="connected">dataStatus: 等待连接</div>
<div class="connected">说明: 列表最多加载50条,避免dom结构崩溃,支持滚动列表暂停(列表长度超过窗口),并在滚动到最后或者离开列表区域后加载新数据</div>
<div class="connected">\t 如果需要交互,需要新增一个接口,带上clientId和业务参数</div>
<div id="messages"></div>
<script>
// 生成客户端唯一标识(可用 UUID,简单示例用时间戳+随机数)
// 生成客户端唯一标识(可用 UUID,简单示例用时间戳+随机数)
const clientId = 'client-' + Date.now() + '-' + Math.random().toString(36).slice(2, 11);
console.log('Client ID:', clientId);
//const url = urlConstant.url_SSE + `?clientId=${encodeURIComponent(clientId)}`;
// 创建带 clientId 的连接(和服务端配合)
// 两种后端方案:
//const url = `http://192.168.4.143:8803/datav/sse-stream-emitter?clientId=${encodeURIComponent(clientId)}`;
const url = `http://192.168.4.143:8803/datav/sse-stream-webFlux?clientId=${encodeURIComponent(clientId)}`;
let eventSource = null;
let reconnectTimer = null;
let reconnectAttempts = 0;
const MAX_RECONNECT_ATTEMPTS = 100; // 最大重试次数(可选)
const BASE_RECONNECT_DELAY = 3000; // 初始重连延迟(3秒)
const MAX_RECONNECT_DELAY = 30000; // 最大延迟(30秒)
const messagesDiv = document.getElementById('messages');
let isUserInView = false; // 鼠标是否在 messagesDiv 内
let isUserManuallyScrolledUp = false; // 用户是否手动向上滚动(未到底)
// 暂存新消息
let pendingMessages = [];
function connect() {
// 避免重复创建
if (eventSource) {
eventSource.close();
}
// 显示连接中状态
updateStatus('Connecting...', 'reconnecting');
updateDataStatus("等待数据...")
eventSource = new EventSource(url);
eventSource.onopen = function (event) {
console.log('SSE connection opened');
updateStatus(`Connected [ID: ${clientId}]`, 'connected');
updateDataStatus("等待连接...")
reconnectAttempts = 0; // 重置重试计数
};
//后端没有指定event,处理 普通数据流,比如日志、聊天消息、实时价格等【本实】
eventSource.onmessage = function (event) {
const data = event.data;
const time = new Date().toLocaleTimeString();
const message = document.createElement('div');
message.className = 'message';
message.textContent = `[${time}] ${data}`;
addToMessageDiv(message)
};
// 后端指定event:处理 特定类型的通知,比如告警、系统事件、用户提醒等
eventSource.addEventListener('custom-message', function (event) {
const data = event.data;
const time = new Date().toLocaleTimeString();
const message = document.createElement('div');
message.className = 'message';
message.style.backgroundColor = '#cce5ff';
message.textContent = `[${time}] 🔔 Custom: ${data}`;
addToMessageDiv(message)
});
// 监听特定事件类型(如 heartbeat, custom-message)
eventSource.addEventListener('heartbeat', function (event) {
//发布时不要打印
//console.log('SSE Heartbeat received:', event.data);
});
eventSource.onerror = function (event) {
console.error('SSE Error:', event);
// readyState: 0=CONNECTING, 1=OPEN, 2=CLOSED
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Connection closed. Attempting to reconnect...');
scheduleReconnect();
} else if (eventSource.readyState === EventSource.CONNECTING) {
updateStatus('Reconnecting...', 'reconnecting');
}
};
//当鼠标在messagesDiv 时,就不要滚动
initMouseEnterOrScroll();
}
function scheduleReconnect() {
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
updateStatus('Max retries reached. Stopped.', 'disconnected');
return;
}
reconnectAttempts++;
const delay = Math.min(BASE_RECONNECT_DELAY * Math.pow(1.5, reconnectAttempts), MAX_RECONNECT_DELAY);
updateStatus(`Reconnecting in ${delay / 1000}s... (Attempt ${reconnectAttempts})`, 'reconnecting');
clearTimeout(reconnectTimer);
reconnectTimer = setTimeout(() => {
connect();
}, delay);
}
function updateStatus(text, className) {
const statusDiv = document.getElementById('status');
statusDiv.textContent = 'Status: ' + text;
statusDiv.className = ''; // 清除旧类
statusDiv.classList.add(className);
}
function updateDataStatus(text) {
const dataStatusDiv = document.getElementById('dataStatus');
dataStatusDiv.textContent = 'dataStatus: ' + text;
}
// 页面加载完成后连接
// 通用跨浏览器事件绑定工具
function on(element, event, handler) {
if (element.addEventListener) {
element.addEventListener(event, handler, false);
} else if (element.attachEvent) {
element.attachEvent('on' + event, handler);
}
}
//当鼠标在messagesDiv 时,就不要滚动
function initMouseEnterOrScroll() {
if (!messagesDiv) {
console.error('未找到 id 为 "messages" 的元素');
}
// 使用 on() 绑定事件,兼容老浏览器
if (messagesDiv) {
on(messagesDiv, 'mouseenter', function () {
isUserInView = true;
console.log("🖱️ 进入消息区域");
updateDataStatus("鼠标进入消息区域,暂定滚动,缓存消息")
});
on(messagesDiv, 'mouseleave', function () {
isUserInView = false;
console.log("🖱️ 离开消息区域");
updateDataStatus("鼠标离开消息区域,恢复滚动,释放消息")
// 如果离开时有暂存消息,立即释放
if (pendingMessages.length > 0) {
flushPendingMessages();
}
});
// 可选:监听滚动,智能判断是否恢复自动滚动
on(messagesDiv, 'scroll', function () {
updateScrollState();
// 如果回到底部,释放暂存消息
if (!isUserManuallyScrolledUp && pendingMessages.length > 0) {
flushPendingMessages();
}
});
// ========== 初始化 ==========
updateScrollState();
}
}
function addToMessageDiv(message) {
// 🔴 关键判断:只要用户在区域内 且 不在底部,就暂存
if (isUserInView && isUserManuallyScrolledUp) {
console.log("📌 消息暂存:用户在查看历史");
updateDataStatus("消息暂存:用户在查看历史")
pendingMessages.push(message);
//showNewMessageBadge(pendingMessages.length);
return;
}
// 添加新消息
messagesDiv.appendChild(message);
console.log("✅ 直接添加消息");
updateDataStatus("直接添加消息")
keepMaxRowInMessageDiv();
// 滚动到底部
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
function keepMaxRowInMessageDiv() {
// 限制最大消息数量为 50 避免页面无限堆积、影响性能
const maxMessages = 50;
const messageElements = messagesDiv.querySelectorAll('.message');
if (messageElements.length > maxMessages) {
// 移除最早的一条(即第一个 .message 元素)
messagesDiv.removeChild(messageElements[0]);
}
}
// ========== 滚动判断函数 ==========
function updateScrollState() {
const isAtBottom =
messagesDiv.scrollHeight - messagesDiv.clientHeight <= messagesDiv.scrollTop + 10;
isUserManuallyScrolledUp = !isAtBottom;
}
// ========== 释放所有暂存消息 ==========
function flushPendingMessages() {
if (pendingMessages.length === 0) return;
console.log(`⬇️ 释放 ${pendingMessages.length} 条暂存消息`);
updateDataStatus(`释放 ${pendingMessages.length} 条暂存消息`)
pendingMessages.forEach(msg => {
messagesDiv.appendChild(msg);
keepMaxRowInMessageDiv();
});
// 清空暂存
pendingMessages = [];
//hideNewMessageBadge();
// 强制滚动到底部
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
// 页面加载完成后连接
on(window, 'load', function () {
connect();
});
// 页面卸载时关闭连接
on(window, 'beforeunload', function () {
if (eventSource) {
eventSource.close();
}
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
});
</script>
</body>
</html>
3. 两种方案
3.1 webflux+reactor
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.0</version>
</dependency>
SseWebFluxController.java
package cn.jiangjiesheng.controller;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@RestController
public class SseWebFluxController {
// 使用 ConcurrentHashMap 存储每个客户端的标识和其 FluxSink(可选:用于反向推送)
private final Map<String, Sinks.Many<ServerSentEvent<String>>> clientSinks = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(10); // 可根据负载调整线程数
private static volatile boolean hasInvokeMock = false;
// /datav/sse-stream
@GetMapping(value = "/sse-stream-webFlux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents(@RequestParam String clientId) {
// 最多缓存 100 条消息 防止浏览器挤压
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer(100);
// 注册 sink
clientSinks.put(clientId, sink);
// 定时数据流
Flux<ServerSentEvent<String>> intervalFlux = Flux.interval(Duration.ofSeconds(5))
//.takeWhile(ignored -> clientSinks.containsKey(clientId))
.map(sequence -> ServerSentEvent.<String>builder()
.id(clientId + "-" + sequence)
.data("定时数据流模拟[无event]: SSE from WebFlux - " + LocalDateTime.now() + " [Client: " + clientId + "]")
.build());
// 心跳
Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(5))
//.takeWhile(ignored -> clientSinks.containsKey(clientId))
.map(seq -> ServerSentEvent.<String>builder()
.event("heartbeat")
.data("ping")
.build());
// 合并流
Flux<ServerSentEvent<String>> serverSentEventFlux = Flux.merge(intervalFlux, heartbeat, sink.asFlux())
.doOnSubscribe(sub -> System.out.println("✅ 客户端连接: " + clientId))
.doOnCancel(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("🔌 客户端取消连接: " + clientId + " | 线程: " + threadName);
// 判断是否为 sse-task 线程
// if (threadName.startsWith("sse-task-")) {
// System.out.println("⚠️ 取消发生在 sse-task 线程!可能并发清理!");
// } else if (threadName.startsWith("http-nio-")) {
// System.out.println("✅ 取消发生在请求线程,安全。");
// }
//cleanupClient(clientId);
})
.doFinally(signalType -> {
String threadName = Thread.currentThread().getName();
System.out.println("🛑 doFinally 触发 [信号: " + signalType + "] | 客户端: " + clientId + " | 线程: " + threadName);
// signalType 可能是:
// - ON_COMPLETE
// - ON_ERROR
// - CANCEL
if (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR || signalType == SignalType.ON_COMPLETE) {
cleanupClient(clientId);
}
})
.doOnTerminate(() -> {
System.out.println("🛑 流终止: " + clientId);
cleanupClient(clientId);
})
.doOnError(err -> {
System.err.println("❌ 流错误 for " + clientId + ": " + err.getMessage());
cleanupClient(clientId);
})
.onErrorResume(throwable -> Flux.empty());
//模拟定时任务
mockSendBizDate();
System.out.println("🧹 清理客户端资源: 当前线程:" + Thread.currentThread().getName() + "]-" + clientId);
return serverSentEventFlux;
}
private void mockSendBizDate() {
if (!hasInvokeMock) {
synchronized (SseWebFluxController.class) {
if (!hasInvokeMock) {
scheduler.scheduleWithFixedDelay(() -> {
// 创建副本,避免遍历时修改 map
List<String> clients = new ArrayList<>(clientSinks.keySet());
for (String clientId : clients) {
try {
pushMessageToClient(clientId, "custom-message", "具体业务event mock " + LocalDateTime.now() + " [Client: " + clientId + "]");
} catch (Exception e) {
System.err.println("定时任务推送异常: " + e.getMessage());
// pushMessageToClient 内部会处理清理
}
}
}, 1, 2, TimeUnit.SECONDS);
hasInvokeMock = true;
}
}
}
}
private void cleanupClient(String clientId) {
Sinks.Many<ServerSentEvent<String>> removed = clientSinks.remove(clientId);
if (removed != null) {
removed.tryEmitComplete(); // 通知 sink 完成
System.out.println("🧹 清理客户端资源: 当前线程:[" + Thread.currentThread().getName() + "]-" + clientId);
}
}
// 示例:外部服务可以调用此方法向指定客户端推送消息
public void pushMessageToClient(String clientId, String event, String eventData) {
Sinks.Many<ServerSentEvent<String>> sink = clientSinks.get(clientId);
if (sink == null) {
System.out.println("⏭️ 客户端不存在或已断开: " + clientId);
return;
}
Sinks.EmitResult result = sink.tryEmitNext(
ServerSentEvent.<String>builder()
.event(event)
.data(eventData)
.build()
);
switch (result) {
case OK:
// 发送成功
break;
case FAIL_TERMINATED:
case FAIL_CANCELLED:
case FAIL_ZERO_SUBSCRIBER:
// 这些状态表明 sink 已无效,应清理
System.err.println("❌ 推送失败 [" + clientId + "],原因: " + result + ",正在清理");
cleanupClient(clientId);
break;
case FAIL_OVERFLOW:
// 缓冲区满,可能是客户端慢或断开,可尝试清理或降级
System.err.println("⚠️ 推送失败 [" + clientId + "]:缓冲区溢出 (FAIL_OVERFLOW)");
// 可选:延迟后重试,或直接清理(保守做法)
// 这里我们选择清理,防止堆积
cleanupClient(clientId);
break;
case FAIL_NON_SERIALIZED:
// 非法并发调用,属于代码 bug
System.err.println("🚨 非法并发调用 tryEmitNext: " + result);
// 不清理 clientSinks,但应修复调用方线程安全问题
break;
default:
// 预防未来新增枚举值
System.err.println("❓ 未知 EmitResult: " + result);
break;
}
}
@PreDestroy
public void shutdown() {
// 清理所有客户端
new ArrayList<>(clientSinks.keySet()).forEach(this::cleanupClient);
// 关闭线程池
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
3.1 Emitter
SseEmitterController.java
package com.envtoday.ecp.datav.controller;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
public class SseEmitterController {
// 存储每个客户端的 SseEmitter(用于反向推送)
private final Map<String, SseEmitter> clientEmitters = new ConcurrentHashMap<>();
// 全局共享的调度器,避免每个连接创建线程池
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(3); // 可根据负载调整线程数
// 防止重复启动定时任务
private static volatile boolean hasInvokeMock = false;
/**
* SSE 流接口
*/
@GetMapping(value = "/sse-stream-emitter", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEvents(@RequestParam String clientId) throws IOException {
// 创建 SseEmitter,设置超时时间(可选,0 表示永不过期)
SseEmitter emitter = new SseEmitter(0L); // 0 = no timeout
// 注册 emitter
clientEmitters.put(clientId, emitter);
// 设置生命周期回调(都在 http-nio 线程执行,安全)
emitter.onCompletion(() -> {
System.out.println("✅ onCompletion: 客户端断开或完成 | 客户端: " + clientId);
cleanupClient(clientId);
});
emitter.onTimeout(() -> {
System.out.println("⏰ onTimeout: 客户端连接超时 | 客户端: " + clientId);
cleanupClient(clientId);
emitter.complete();
});
emitter.onError(throwable -> {
// Windows 系统常见断开提示:"你的主机中的软件中止了一个已建立的连接。"
// Linux/Unix 常见:"Connection reset by peer" 或 "Broken pipe"
String msg = throwable.getMessage();
if (msg != null && (
msg.contains("中止了一个已建立的连接") ||
msg.contains("Connection reset by peer") ||
msg.contains("Broken pipe") ||
msg.contains("Connection timed out"))) {
// 视为正常断开,不打印错误
} else {
// 其他异常,可以打印警告
System.err.println("⚠️ SSE 连接异常 [非网络断开]: " + clientId + " | " + throwable.getMessage());
}
cleanupClient(clientId);
// emitter.complete();
});
// 发送初始连接消息
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("已连接到 SSE 服务 [Client: " + clientId + "] at " + LocalDateTime.now()));
} catch (IOException e) {
// 客户端可能已断开
emitter.complete();
cleanupClient(clientId);
return emitter;
}
// 启动定时任务(仅首次调用)
mockSendBizDate();
// 启动内置定时数据流(每 5 秒)
startIntervalData(clientId, emitter);
// 启动心跳(每 5 秒)
startHeartbeat(clientId, emitter);
System.out.println("✅ 客户端连接成功: " + clientId);
return emitter;
}
/**
* 启动每 5 秒的定时数据流
*/
private void startIntervalData(String clientId, SseEmitter emitter) {
scheduler.scheduleAtFixedRate(() -> {
if (!clientEmitters.containsKey(clientId)) {
return;
}
// 2. 检查 emitter 是否已过期或连接断开
// 方法:尝试 send 一个空数据,看是否抛出异常,或者通过 onCompletion/onError 已触发
// 但更主动的方式是:我们无法直接获取状态,但可以封装判断逻辑
try {
emitter.send(SseEmitter.event()
.id(clientId + "-" + System.currentTimeMillis())
.data("定时数据流模拟[无event]: SSE from SseEmitter - " + LocalDateTime.now() + " [Client: " + clientId + "]"));
} catch (IOException e) {
// 客户端断开连接是正常行为,不打印错误日志
// 可选:只在 DEBUG 级别打印
// log.debug("客户端 {} 连接中断,准备清理", clientId);
cleanupClient(clientId);
}
}, 0, 5, TimeUnit.SECONDS);
// emitter.onCompletion(intervalScheduler::shutdown);
}
/**
* 启动每 5 秒的心跳
*/
private void startHeartbeat(String clientId, SseEmitter emitter) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
if (!clientEmitters.containsKey(clientId)) {
//heartbeatScheduler.shutdown();
return;
}
try {
emitter.send(SseEmitter.event()
.name("heartbeat")
.data("ping"));
} catch (IOException e) {
// 静默处理断开连接
cleanupClient(clientId);
// heartbeatScheduler.shutdown();
}
}, 5, 5, TimeUnit.SECONDS);
// emitter.onCompletion(heartbeatScheduler::shutdown);
}
/**
* 模拟定时业务消息推送(每 2 秒)
*/
private void mockSendBizDate() {
if (!hasInvokeMock) {
synchronized (SseEmitterController.class) {
if (!hasInvokeMock) {
scheduler.scheduleWithFixedDelay(() -> {
List<String> clients = new ArrayList<>(clientEmitters.keySet());
for (String clientId : clients) {
try {
pushMessageToClient(clientId, "custom-message", "具体业务event mock " + LocalDateTime.now() + " [Client: " + clientId + "]");
} catch (Exception e) {
System.err.println("定时任务推送异常: " + e.getMessage());
}
}
}, 1, 2, TimeUnit.SECONDS);
hasInvokeMock = true;
}
}
}
}
/**
* 清理客户端资源
*/
private void cleanupClient(String clientId) {
SseEmitter removed = clientEmitters.remove(clientId);
if (removed != null) {
try {
removed.complete(); // 确保关闭连接
} catch (Exception ignored) {}
System.out.println("🧹 已清理客户端资源: " + clientId + " | 当前线程: [" + Thread.currentThread().getName() + "],时间:"+new Date());
}
}
/**
* 外部服务可调用此方法向指定客户端推送消息
*/
public void pushMessageToClient(String clientId, String event, String eventData) {
SseEmitter emitter = clientEmitters.get(clientId);
if (emitter == null) {
// 客户端已断开,不打印日志(避免刷屏)
return;
}
try {
emitter.send(SseEmitter.event()
.name(event)
.data(eventData));
} catch (IOException e) {
// 客户端连接已断开,静默清理
// System.out.println(e);
cleanupClient(clientId);
}
}
// ========================
// 可选:提供管理接口
// ========================
/**
* 获取当前连接的客户端数(用于监控)
*/
public int getClientCount() {
return clientEmitters.size();
}
/**
* 广播消息给所有客户端
*/
public void broadcast(String event, String data) {
List<String> failedClients = new ArrayList<>();
for (Map.Entry<String, SseEmitter> entry : clientEmitters.entrySet()) {
try {
entry.getValue().send(SseEmitter.event().name(event).data(data));
} catch (IOException e) {
failedClients.add(entry.getKey());
}
}
// 清理失败的客户端
failedClients.forEach(clientId -> {
System.err.println("广播失败,清理客户端: " + clientId);
cleanupClient(clientId);
});
}
@PreDestroy
public void shutdown() {
// 清理所有客户端
new ArrayList<>(clientEmitters.keySet()).forEach(this::cleanupClient);
// 关闭线程池
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
正文到此结束
- 本文标签: Java Spring Spring Boot
- 本文链接: https://code.jiangjiesheng.cn/article/376
- 版权声明: 本文由小江同学原创发布,转载请先联系本站长,谢谢。