SseUtil.java 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package com.backendsys.modules.sse.utils;
  2. import cn.hutool.core.convert.Convert;
  3. import cn.hutool.json.JSONUtil;
  4. import com.backendsys.modules.common.config.security.utils.SecurityUtil;
  5. import com.backendsys.modules.sse.emitter.SseEmitterManager;
  6. import com.backendsys.modules.sse.entity.SseResponse;
  7. import com.backendsys.modules.sse.entity.SseResponseEnum;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  11. import java.io.IOException;
  12. import java.util.concurrent.ConcurrentHashMap;
  13. @Component
  14. public class SseUtil {
  15. @Value("${spring.application.name}")
  16. private String APPLICATION_NAME;
  17. // [SSE] 发送消息 (单个)
  18. public void send(String emitterKey, Object data) {
  19. SseEmitterManager manager = SseEmitterManager.getInstance();
  20. SseEmitterUTF8 emitter = manager.getEmitter(emitterKey);
  21. if (emitter != null) {
  22. try {
  23. emitter.send(SseEmitter.event().data(data));
  24. } catch (IOException e) {
  25. System.out.println(e.getMessage());
  26. manager.removeEmitter(emitter);
  27. }
  28. }
  29. }
  30. // [SSE] 发送消息 (单个) (自己)
  31. public void send(Object data) {
  32. Long user_id = SecurityUtil.getUserId();
  33. SseEmitterManager manager = SseEmitterManager.getInstance();
  34. SseEmitterUTF8 emitter = manager.getEmitter(APPLICATION_NAME + "-userid-" + Convert.toStr(user_id));
  35. if (emitter != null) {
  36. try {
  37. emitter.send(SseEmitter.event().data(data));
  38. } catch (IOException e) {
  39. System.out.println(e.getMessage());
  40. manager.removeEmitter(emitter);
  41. }
  42. }
  43. }
  44. // [SSE] 发送消息 (全部)
  45. public void sendToAll(Object data) {
  46. SseEmitterManager manager = SseEmitterManager.getInstance();
  47. ConcurrentHashMap<String, SseEmitterUTF8> emitters = manager.getAllEmitters();
  48. for (String key : emitters.keySet()) {
  49. SseEmitterUTF8 emitter = emitters.get(key);
  50. if (emitter != null) {
  51. try {
  52. emitter.send(SseEmitter.event().data(data));
  53. } catch (IOException e) {
  54. System.out.println("Failed to send message to emitter with key: " + key + ", Error: " + e.getMessage());
  55. manager.removeEmitter(emitter);
  56. }
  57. }
  58. }
  59. }
  60. // 如果存在,则关闭 (此操作会完全关闭未连接的监听)
  61. public void closeIfExist(String emitterKey) {
  62. // 如果用户ID已经存在对应的连接,则关闭旧的连接
  63. SseEmitterManager manager = SseEmitterManager.getInstance();
  64. SseEmitterUTF8 oldEmitter = manager.getEmitter(emitterKey);
  65. if (oldEmitter != null) {
  66. try {
  67. String dataStr = (new SseResponse(SseResponseEnum.DISCONNECT)).toJsonStr();
  68. oldEmitter.send(SseEmitter.event().data(dataStr));
  69. oldEmitter.complete(); // 关闭旧的连接
  70. manager.removeEmitter(oldEmitter); // 从管理器中移除旧的连接
  71. } catch (IOException e) {
  72. System.out.println(e.getMessage());
  73. manager.removeEmitter(oldEmitter);
  74. }
  75. }
  76. }
  77. }