tsurumure 7 місяців тому
батько
коміт
811ea7b40f

+ 2 - 2
db/Import/import_win.bat

@@ -7,10 +7,10 @@
 :: Continue? (Press y|Y for Yes, any other key for No) : (y) ::
 
 :: Win10 (Company) ::
-:: set mysql="C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql.exe" ::
+set mysql="C:\Program Files\MySQL\MySQL Server 8.0\bin\mysql.exe"
 
 :: Win11 (Home) ::
-set mysql="D:\Program\MySQL\MySQL Server 8.1\bin\mysql.exe"
+:: set mysql="D:\Program\MySQL\MySQL Server 8.1\bin\mysql.exe" ::
 
 set sql_directory="D:\CodeJava\QuickLaunchSpring\BackendSys\db"
 

+ 22 - 13
src/main/java/com/backendsys/modules/sse/controller/SseController.java

@@ -1,17 +1,19 @@
 package com.backendsys.modules.sse.controller;
 
 import cn.hutool.core.convert.Convert;
+import com.backendsys.modules.common.config.security.utils.SecurityUtil;
 import com.backendsys.modules.sse.emitter.SseEmitterManager;
+import com.backendsys.modules.sse.entity.SseResponse;
+import com.backendsys.modules.sse.entity.SseResponseEnum;
 import com.backendsys.modules.sse.utils.SseEmitterUTF8;
 import com.backendsys.modules.sse.utils.SseUtil;
+import jakarta.servlet.http.HttpServletResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 @RestController
 public class SseController {
@@ -23,21 +25,28 @@ public class SseController {
      * [SSE] 消息监听
      */
     @GetMapping(value = "/api/sse/stream", produces = "text/event-stream;charset=UTF-8")
-    public SseEmitter stream() {
-        String userId = Convert.toStr(1L);
+    public SseEmitter stream(HttpServletResponse response) {
+        // 获得当前用户Id
+        String userId = Convert.toStr(SecurityUtil.getUserId());
+
+        // 如果存在,则关闭
+        sseUtil.closeIfExist(userId);
+
         SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE);
         SseEmitterManager manager = SseEmitterManager.getInstance();
         manager.addEmitter(userId, emitter);
         try {
-            // 如果存在,则关闭
-            sseUtil.closeIfExist(userId);
-            // 创建新连接
-            manager.addEmitter(userId, emitter);
-            emitter.send(SseEmitter.event().data("Connected successfully! (连接成功)"));
+            // 设置响应头,指定字符编码
+            response.setCharacterEncoding("UTF-8");
+            response.setContentType("text/event-stream");
+            // CONNECT: 建立连接
+            String dataStr = (new SseResponse(SseResponseEnum.CONNECT)).toJsonStr();
+            emitter.send(SseEmitter.event().data(dataStr));
+
         } catch (IOException e) {
             // 关闭连接
-            // emitter.complete();
-            manager.emitters.remove(emitter);
+            System.out.println("(/api/sse/stream)(controller):" + e.getMessage());
+            manager.removeEmitter(emitter);
         }
         return emitter;
     }
@@ -47,8 +56,8 @@ public class SseController {
      */
     @GetMapping("/api/sse/send")
     public String send() {
-        String message = "{\"message\": \"Hello World\"}";
-        sseUtil.send(1L, message);
+        String dataStr = (new SseResponse("Hello World")).toJsonStr();
+        sseUtil.send(SecurityUtil.getUserId(), dataStr);
         return "success";
     }
 

+ 6 - 7
src/main/java/com/backendsys/modules/sse/emitter/SseEmitterManager.java

@@ -10,7 +10,6 @@ public class SseEmitterManager {
     // 单例实例
     private static final SseEmitterManager INSTANCE = new SseEmitterManager();
     // 存储SseEmitter的线程安全列表
-//    public final CopyOnWriteArrayList<Long, SseEmitter> emitters = new CopyOnWriteArrayList<>();
     public final ConcurrentHashMap<String, SseEmitterUTF8> emitters = new ConcurrentHashMap<>();
 
     // 私有构造函数,防止外部直接实例化
@@ -23,21 +22,21 @@ public class SseEmitterManager {
     public void addEmitter(String userId, SseEmitterUTF8 emitter) {
         this.emitters.put(userId, emitter);
         emitter.onTimeout(() -> {
-            System.out.println("emitter (onTimeout) 超时");
-            this.emitters.remove(emitter);
+            System.out.println("emitter (onTimeout) 超时, userId: " + userId);
+            removeEmitter(emitter);
         });
         emitter.onCompletion(() -> {
-            System.out.println("emitter (onCompletion) 中断");
-            this.emitters.remove(emitter);
+            System.out.println("emitter (onCompletion) 中断, userId: " + userId);
+            removeEmitter(emitter);
         });
     }
     // 公共方法,供外部移除SseEmitter
     public SseEmitterUTF8 getEmitter(String userId) {
-        // 根据用户ID获取 SseEmitter
         return this.emitters.get(userId);
     }
     // 公共方法,供外部移除SseEmitter
     public void removeEmitter(SseEmitterUTF8 emitter) {
-        this.emitters.remove(emitter);
+        this.emitters.values().removeIf(e -> e == emitter); // 安全移除
+        emitter.complete(); // 显式调用complete方法
     }
 }

+ 40 - 0
src/main/java/com/backendsys/modules/sse/emitter/__SseEmitterManager.java

@@ -0,0 +1,40 @@
+//package com.backendsys.modules.sse.emitter;
+//
+//import com.backendsys.modules.sse.utils.SseEmitterUTF8;
+//
+//import java.util.concurrent.ConcurrentHashMap;
+//
+//public class __SseEmitterManager {
+//    // 单例实例
+//    private static final __SseEmitterManager INSTANCE = new __SseEmitterManager();
+//    // 存储SseEmitter的线程安全列表
+//    public final ConcurrentHashMap<String, SseEmitterUTF8> emitters = new ConcurrentHashMap<>();
+//
+//    // 私有构造函数,防止外部直接实例化
+//    private __SseEmitterManager() {}
+//    // 公共静态方法,获取单例实例
+//    public static __SseEmitterManager getInstance() {
+//        return INSTANCE;
+//    }
+//    // 公共方法,供外部添加SseEmitter
+//    public void addEmitter(String userId, SseEmitterUTF8 emitter) {
+//        this.emitters.put(userId, emitter);
+//        emitter.onTimeout(() -> {
+//            System.out.println("emitter (onTimeout) 超时, userId: " + userId);
+//            this.emitters.remove(emitter);
+//        });
+//        emitter.onCompletion(() -> {
+//            System.out.println("emitter (onCompletion) 中断, userId: " + userId);
+//            this.emitters.remove(emitter);
+//        });
+//    }
+//    // 公共方法,供外部移除SseEmitter
+//    public SseEmitterUTF8 getEmitter(String userId) {
+//        // 根据用户ID获取 SseEmitter
+//        return this.emitters.get(userId);
+//    }
+//    // 公共方法,供外部移除SseEmitter
+//    public void removeEmitter(SseEmitterUTF8 emitter) {
+//        this.emitters.remove(emitter);
+//    }
+//}

+ 31 - 0
src/main/java/com/backendsys/modules/sse/entity/SseResponse.java

@@ -0,0 +1,31 @@
+package com.backendsys.modules.sse.entity;
+
+import cn.hutool.json.JSONUtil;
+import lombok.Data;
+
+@Data
+public class SseResponse {
+
+    private String type;
+    private String message;
+
+    // 无参构造函数(可选)
+    public SseResponse() {
+    }
+
+    // 接受SeeResponseEnum的构造函数
+    public SseResponse(SseResponseEnum responseEnum) {
+        this.type = responseEnum.getType();
+        this.message = responseEnum.getMessage();
+    }
+
+    public SseResponse(String message) {
+        this.type = SseResponseEnum.NOTICE.getType();
+        this.message = message;
+    }
+
+    public String toJsonStr() {
+        return JSONUtil.toJsonStr(this);
+    }
+
+}

+ 22 - 0
src/main/java/com/backendsys/modules/sse/entity/SseResponseEnum.java

@@ -0,0 +1,22 @@
+package com.backendsys.modules.sse.entity;
+
+public enum SseResponseEnum {
+
+    CONNECT("connect", "建立连接"),
+    NOTICE("notice", "通知");
+
+    private String type;
+    private String message;
+    public String getType() {
+        return this.type;
+    }
+    public String getMessage() {
+        return this.message;
+    }
+
+    SseResponseEnum(String type, String message) {
+        this.type = type;
+        this.message = message;
+    }
+
+}