Bladeren bron

调整sse

tsurumure 7 maanden geleden
bovenliggende
commit
b7779bbaf4

+ 4 - 4
configuration/nginx/acme.md

@@ -14,15 +14,15 @@ $ acme.sh -v
 
 2.登录
 ```
-$ acme.sh --set-default-ca --server letsencrypt
 $ acme.sh --register-account -m 405348097@qq.com
+$ acme.sh --set-default-ca --server letsencrypt
 ```
 
 3.api接口需要增加以下 nginx 配置
 ```
 server {
     listen          80;
-    server_name     duanju2.api.daoguyujia.com;
+    server_name     duanju2.api.styujia.com;
     
     location ^~ /.well-known/acme-challenge/ {
         root /home/webroot;
@@ -36,8 +36,8 @@ $ mkdir -p /home/webroot/.well-known/acme-challenge
 
 4.生成证书
 ```
-$ acme.sh --issue -d ai.manage.daoguyujia.com -w /home/FrontendSys/dist/
-$ acme.sh --issue -d ai.api.daoguyujia.com -w /home/webroot/
+$ acme.sh --issue -d duanju2.manage.styujia.com -w /home/YujiaDuanjuAdmin/dist/
+$ acme.sh --issue -d duanju2.api.styujia.com -w /home/webroot/
 ```
 
 5.设置自动更新

+ 0 - 0
configuration/nginx/ai.api.conf → configuration/nginx/conf.d/ai.api.conf


+ 0 - 0
configuration/nginx/ai.manage.conf → configuration/nginx/conf.d/ai.manage.conf


+ 0 - 0
configuration/nginx/drone.conf → configuration/nginx/conf.d/drone.conf


+ 0 - 0
configuration/nginx/gogs.conf → configuration/nginx/conf.d/gogs.conf


+ 6 - 1
src/main/java/com/backendsys/modules/log/controller/LogStreamController.java

@@ -3,6 +3,7 @@ package com.backendsys.modules.log.controller;
 import cn.hutool.core.convert.Convert;
 import cn.hutool.core.util.StrUtil;
 import com.backendsys.modules.common.config.security.annotations.Anonymous;
+import com.backendsys.modules.common.config.security.utils.SecurityUtil;
 import com.backendsys.modules.common.utils.Result;
 import com.backendsys.modules.dingtalk.utils.DingTalkUtil;
 import com.backendsys.modules.sse.emitter.SseEmitterManager;
@@ -23,6 +24,9 @@ import java.io.InputStreamReader;
 @RestController
 public class LogStreamController {
 
+    @Value("${spring.application.name}")
+    private String APPLICATION_NAME;
+
     @Autowired
     private SseUtil sseUtil;
 
@@ -99,7 +103,8 @@ public class LogStreamController {
     @GetMapping("/api/log/stream/send")
     public String send() {
         String message = "{\"message\": \"Hello World 中文\"}";
-        sseUtil.send(1L, message);
+        // 获得当前用户Id
+        sseUtil.send(APPLICATION_NAME + "-1", message);
         return "success";
     }
 

+ 8 - 2
src/main/java/com/backendsys/modules/sdk/tencentcloud/cos/service/impl/TencentCosServiceImpl.java

@@ -4,6 +4,7 @@ import cn.hutool.core.convert.Convert;
 import cn.hutool.core.util.StrUtil;
 import com.backendsys.exception.CustException;
 import com.backendsys.modules.common.config.security.utils.HttpRequestUtil;
+import com.backendsys.modules.common.config.security.utils.SecurityUtil;
 import com.backendsys.modules.common.utils.CommonUtil;
 import com.backendsys.modules.sdk.tencentcloud.cos.entity.Progress;
 import com.backendsys.modules.sdk.tencentcloud.cos.entity.ProgressData;
@@ -48,6 +49,9 @@ public class TencentCosServiceImpl implements TencentCosService {
     @Autowired
     private SseUtil sseUtil;
 
+
+    @Value("${spring.application.name}")
+    private String APPLICATION_NAME;
     @Value("${tencent.cos.max-size}")
     private long MAX_SIZE;
     @Value("${tencent.cos.region}")
@@ -77,7 +81,9 @@ public class TencentCosServiceImpl implements TencentCosService {
 
         Progress progress = new Progress();
         progress.setState("Init");
-        sseUtil.send(httpRequestUtil.getUserId(), progress);
+
+        String emitterKey = APPLICATION_NAME + "-userid-" + Convert.toStr(SecurityUtil.getUserId());
+        sseUtil.send(emitterKey, progress);
 
         // 查询上传是否已经完成
         while (transfer.isDone() == false) {
@@ -99,7 +105,7 @@ public class TencentCosServiceImpl implements TencentCosService {
 
             progress.setState(state);
             progress.setData(progressData);
-            sseUtil.send(httpRequestUtil.getUserId(), progress);
+            sseUtil.send(emitterKey, progress);
 
             // state: (完成 Completed, 失败 Failed)
             // System.out.println(transfer.getState());

+ 27 - 11
src/main/java/com/backendsys/modules/sse/controller/SseController.java

@@ -9,6 +9,7 @@ import com.backendsys.modules.sse.utils.SseEmitterUTF8;
 import com.backendsys.modules.sse.utils.SseUtil;
 import jakarta.servlet.http.HttpServletResponse;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -18,23 +19,27 @@ import java.io.IOException;
 @RestController
 public class SseController {
 
+    @Value("${spring.application.name}")
+    private String APPLICATION_NAME;
+
     @Autowired
     private SseUtil sseUtil;
 
     /**
-     * [SSE] 消息监听
+     * [SSE] 消息监听 (拼接键值: 应用名称-用户ID)
      */
-    @GetMapping(value = "/api/sse/stream", produces = "text/event-stream;charset=UTF-8")
+    @GetMapping(value = "/api/sse/listenStream", produces = "text/event-stream;charset=UTF-8")
     public SseEmitter stream(HttpServletResponse response) {
-        // 获得当前用户Id
-        String userId = Convert.toStr(SecurityUtil.getUserId());
+
+        // 拼接键值: 应用名称-用户ID
+        String emitterKey = APPLICATION_NAME + "-userid-" + Convert.toStr(SecurityUtil.getUserId());
 
         // 如果存在,则关闭
-        sseUtil.closeIfExist(userId);
+        sseUtil.closeIfExist(emitterKey);
 
         SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE);
         SseEmitterManager manager = SseEmitterManager.getInstance();
-        manager.addEmitter(userId, emitter);
+        manager.addEmitter(emitterKey, emitter);
         try {
             // 设置响应头,指定字符编码
             response.setCharacterEncoding("UTF-8");
@@ -45,19 +50,30 @@ public class SseController {
 
         } catch (IOException e) {
             // 关闭连接
-            System.out.println("(/api/sse/stream)(controller):" + e.getMessage());
+            System.out.println("(listenStream):" + e.getMessage());
             manager.removeEmitter(emitter);
         }
         return emitter;
     }
 
     /**
-     * [SSE] 测试发送
+     * [SSE] 测试发送 (单个)
      */
-    @GetMapping("/api/sse/send")
-    public String send() {
+    @GetMapping("/api/sse/sendHello")
+    public String sendHelloWorld() {
         String dataStr = (new SseResponse("Hello World")).toJsonStr();
-        sseUtil.send(SecurityUtil.getUserId(), dataStr);
+        String emitterKey = APPLICATION_NAME + "-userid-" + Convert.toStr(SecurityUtil.getUserId());
+        sseUtil.send(emitterKey, dataStr);
+        return "success";
+    }
+
+    /**
+     * [SSE] 测试发送 (全部)
+     */
+    @GetMapping("/api/sse/sendHelloToAll")
+    public String sendHelloWorldToAll() {
+        String dataStr = (new SseResponse("Hello World Everyone")).toJsonStr();
+        sseUtil.sendToAll(dataStr);
         return "success";
     }
 

+ 10 - 6
src/main/java/com/backendsys/modules/sse/emitter/SseEmitterManager.java

@@ -19,24 +19,28 @@ public class SseEmitterManager {
         return INSTANCE;
     }
     // 公共方法,供外部添加SseEmitter
-    public void addEmitter(String userId, SseEmitterUTF8 emitter) {
-        this.emitters.put(userId, emitter);
+    public void addEmitter(String emitterKey, SseEmitterUTF8 emitter) {
+        this.emitters.put(emitterKey, emitter);
         emitter.onTimeout(() -> {
-            System.out.println("emitter (onTimeout) 超时, userId: " + userId);
+            System.out.println("emitter (onTimeout) 超时, emitterKey: " + emitterKey);
             removeEmitter(emitter);
         });
         emitter.onCompletion(() -> {
-            System.out.println("emitter (onCompletion) 中断, userId: " + userId);
+            System.out.println("emitter (onCompletion) 中断, emitterKey: " + emitterKey);
             removeEmitter(emitter);
         });
     }
     // 公共方法,供外部移除SseEmitter
-    public SseEmitterUTF8 getEmitter(String userId) {
-        return this.emitters.get(userId);
+    public SseEmitterUTF8 getEmitter(String emitterKey) {
+        return this.emitters.get(emitterKey);
     }
     // 公共方法,供外部移除SseEmitter
     public void removeEmitter(SseEmitterUTF8 emitter) {
         this.emitters.values().removeIf(e -> e == emitter); // 安全移除
         emitter.complete(); // 显式调用complete方法
     }
+    // 公共方法,获取所有SseEmitter实例
+    public ConcurrentHashMap<String, SseEmitterUTF8> getAllEmitters() {
+        return this.emitters;
+    }
 }

+ 22 - 5
src/main/java/com/backendsys/modules/sse/utils/SseUtil.java

@@ -7,14 +7,15 @@ import org.springframework.stereotype.Component;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Component
 public class SseUtil {
 
-    // [SSE] 发送消息
-    public void send(Long userId, Object data) {
+    // [SSE] 发送消息 (单个)
+    public void send(String emitterKey, Object data) {
         SseEmitterManager manager = SseEmitterManager.getInstance();
-        SseEmitterUTF8 emitter = manager.getEmitter(Convert.toStr(userId));
+        SseEmitterUTF8 emitter = manager.getEmitter(emitterKey);
         if (emitter != null) {
             try {
                 emitter.send(SseEmitter.event().data(data));
@@ -24,12 +25,28 @@ public class SseUtil {
             }
         }
     }
+    // [SSE] 发送消息 (全部)
+    public void sendToAll(Object data) {
+        SseEmitterManager manager = SseEmitterManager.getInstance();
+        ConcurrentHashMap<String, SseEmitterUTF8> emitters = manager.getAllEmitters();
+        for (String key : emitters.keySet()) {
+            SseEmitterUTF8 emitter = emitters.get(key);
+            if (emitter != null) {
+                try {
+                    emitter.send(SseEmitter.event().data(data));
+                } catch (IOException e) {
+                    System.out.println("Failed to send message to emitter with key: " + key + ", Error: " + e.getMessage());
+                    manager.removeEmitter(emitter);
+                }
+            }
+        }
+    }
 
     // 如果存在,则关闭 (此操作会完全关闭未连接的监听)
-    public void closeIfExist(String userId) {
+    public void closeIfExist(String emitterKey) {
         // 如果用户ID已经存在对应的连接,则关闭旧的连接
         SseEmitterManager manager = SseEmitterManager.getInstance();
-        SseEmitterUTF8 oldEmitter = manager.getEmitter(userId);
+        SseEmitterUTF8 oldEmitter = manager.getEmitter(emitterKey);
         if (oldEmitter != null) {
             try {
                 oldEmitter.send(SseEmitter.event().data("Disconnected! (连接中断)"));