1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- package com.backendsys.modules.sse.utils;
- import cn.hutool.core.convert.Convert;
- import cn.hutool.json.JSONUtil;
- import com.backendsys.modules.common.config.security.utils.SecurityUtil;
- import com.backendsys.modules.sse.emitter.SseEmitterManager;
- import com.backendsys.modules.sse.entity.SseResponse;
- import com.backendsys.modules.sse.entity.SseResponseEnum;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
- import java.io.IOException;
- import java.util.concurrent.ConcurrentHashMap;
- @Component
- public class SseUtil {
- @Value("${spring.application.name}")
- private String APPLICATION_NAME;
- // [SSE] 发送消息 (单个)
- public void send(String emitterKey, Object data) {
- SseEmitterManager manager = SseEmitterManager.getInstance();
- SseEmitterUTF8 emitter = manager.getEmitter(emitterKey);
- if (emitter != null) {
- try {
- emitter.send(SseEmitter.event().data(data));
- } catch (IOException e) {
- System.out.println(e.getMessage());
- manager.removeEmitter(emitter);
- }
- }
- }
- // [SSE] 发送消息 (单个) (自己)
- public void send(Object data) {
- Long user_id = SecurityUtil.getUserId();
- SseEmitterManager manager = SseEmitterManager.getInstance();
- SseEmitterUTF8 emitter = manager.getEmitter(APPLICATION_NAME + "-userid-" + Convert.toStr(user_id));
- if (emitter != null) {
- try {
- emitter.send(SseEmitter.event().data(data));
- } catch (IOException e) {
- System.out.println(e.getMessage());
- manager.removeEmitter(emitter);
- }
- }
- }
- // [SSE] 发送消息 (全部)
- public void sendToAll(Object data) {
- SseEmitterManager manager = SseEmitterManager.getInstance();
- ConcurrentHashMap<String, SseEmitterUTF8> emitters = manager.getAllEmitters();
- for (String key : emitters.keySet()) {
- SseEmitterUTF8 emitter = emitters.get(key);
- if (emitter != null) {
- try {
- emitter.send(SseEmitter.event().data(data));
- } catch (IOException e) {
- System.out.println("Failed to send message to emitter with key: " + key + ", Error: " + e.getMessage());
- manager.removeEmitter(emitter);
- }
- }
- }
- }
- // 如果存在,则关闭 (此操作会完全关闭未连接的监听)
- public void closeIfExist(String emitterKey) {
- // 如果用户ID已经存在对应的连接,则关闭旧的连接
- SseEmitterManager manager = SseEmitterManager.getInstance();
- SseEmitterUTF8 oldEmitter = manager.getEmitter(emitterKey);
- if (oldEmitter != null) {
- try {
- String dataStr = (new SseResponse(SseResponseEnum.DISCONNECT)).toJsonStr();
- oldEmitter.send(SseEmitter.event().data(dataStr));
- oldEmitter.complete(); // 关闭旧的连接
- manager.removeEmitter(oldEmitter); // 从管理器中移除旧的连接
- } catch (IOException e) {
- System.out.println(e.getMessage());
- manager.removeEmitter(oldEmitter);
- }
- }
- }
- }
|