package com.backendsys.modules.sse.utils; import cn.hutool.core.convert.Convert; import cn.hutool.json.JSONUtil; import com.backendsys.modules.common.config.security.utils.SecurityUtil; import com.backendsys.modules.sse.emitter.SseEmitterManager; import com.backendsys.modules.sse.entity.SseResponse; import com.backendsys.modules.sse.entity.SseResponseEnum; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @Component public class SseUtil { @Value("${spring.application.name}") private String APPLICATION_NAME; // [SSE] 发送消息 (单个) public void send(String emitterKey, Object data) { SseEmitterManager manager = SseEmitterManager.getInstance(); SseEmitterUTF8 emitter = manager.getEmitter(emitterKey); if (emitter != null) { try { emitter.send(SseEmitter.event().data(data)); } catch (IOException e) { System.out.println(e.getMessage()); manager.removeEmitter(emitter); } } } // [SSE] 发送消息 (单个) (自己) public void send(Object data) { Long user_id = SecurityUtil.getUserId(); SseEmitterManager manager = SseEmitterManager.getInstance(); SseEmitterUTF8 emitter = manager.getEmitter(APPLICATION_NAME + "-userid-" + Convert.toStr(user_id)); if (emitter != null) { try { emitter.send(SseEmitter.event().data(data)); } catch (IOException e) { System.out.println(e.getMessage()); manager.removeEmitter(emitter); } } } // [SSE] 发送消息 (全部) public void sendToAll(Object data) { SseEmitterManager manager = SseEmitterManager.getInstance(); ConcurrentHashMap emitters = manager.getAllEmitters(); for (String key : emitters.keySet()) { SseEmitterUTF8 emitter = emitters.get(key); if (emitter != null) { try { emitter.send(SseEmitter.event().data(data)); } catch (IOException e) { System.out.println("Failed to send message to emitter with key: " + key + ", Error: " + e.getMessage()); manager.removeEmitter(emitter); } } } } // 如果存在,则关闭 (此操作会完全关闭未连接的监听) public void closeIfExist(String emitterKey) { // 如果用户ID已经存在对应的连接,则关闭旧的连接 SseEmitterManager manager = SseEmitterManager.getInstance(); SseEmitterUTF8 oldEmitter = manager.getEmitter(emitterKey); if (oldEmitter != null) { try { String dataStr = (new SseResponse(SseResponseEnum.DISCONNECT)).toJsonStr(); oldEmitter.send(SseEmitter.event().data(dataStr)); oldEmitter.complete(); // 关闭旧的连接 manager.removeEmitter(oldEmitter); // 从管理器中移除旧的连接 } catch (IOException e) { System.out.println(e.getMessage()); manager.removeEmitter(oldEmitter); } } } }