Ver código fonte

完成Sse优化

tsurumure 10 meses atrás
pai
commit
996145c0d0

+ 21 - 8
src/main/java/com/backendsys/modules/common/config/security/AnonymousProperties.java

@@ -1,6 +1,7 @@
 package com.backendsys.modules.common.config.security;
 
 import cn.hutool.core.convert.Convert;
+import cn.hutool.core.util.StrUtil;
 import com.backendsys.modules.common.config.security.annotations.Anonymous;
 import org.apache.commons.lang3.RegExUtils;
 import org.springframework.beans.BeansException;
@@ -19,6 +20,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import cn.hutool.core.util.ReUtil;
 
 @Configuration
 public class AnonymousProperties implements InitializingBean, ApplicationContextAware {
@@ -49,16 +51,27 @@ public class AnonymousProperties implements InitializingBean, ApplicationContext
             Anonymous method = AnnotationUtils.findAnnotation(handlerMethod.getMethod(), Anonymous.class);
 
             if (method != null) {
-                Pattern pattern = Pattern.compile("\\{(\\w+) \\[(.+?)\\]}");
-                // 排除 SSE协议接口 多余的字段
-                String infoContent = info.toString().replace(", produces [text/event-stream]", "");
-                Matcher matcher = pattern.matcher(infoContent);
-//                Matcher matcher = pattern.matcher(info.toString());
-                if (matcher.find()) {
-                    String url = matcher.group(2);
-                    System.out.println("url: " + url);
+//                Pattern pattern = Pattern.compile("\\{(\\w+) \\[(.+?)\\]}");
+//                // 排除 SSE协议接口 多余的字段
+//                String infoContent = info.toString().replace(", produces [text/event-stream]", "");
+//                Matcher matcher = pattern.matcher(infoContent);
+////                System.out.println("matcher1:");
+////                System.out.println(matcher);
+////                Matcher matcher = pattern.matcher(info.toString());
+//                if (matcher.find()) {
+//                    String url = matcher.group(2);
+//                    System.out.println("url: " + url);
+//                    urls.add(url);
+//                }
+
+                // 正则表达式,匹配第一个方括号内的内容
+                Pattern regex = Pattern.compile("\\[(.*?)\\]");
+                String url = ReUtil.getGroup1(regex, info.toString());
+                if (StrUtil.isNotEmpty(url)) {
+                    System.out.println("(@Anonymous) url: " + url);
                     urls.add(url);
                 }
+
             }
 
 ////          ifPresent()方法就是会返回一个boolean类型值,如果对象不为空则为真,如果为空则为false

+ 21 - 16
src/main/java/com/backendsys/modules/log/controller/LogStreamController.java

@@ -2,54 +2,59 @@ package com.backendsys.modules.log.controller;
 
 import cn.hutool.core.convert.Convert;
 import com.backendsys.modules.common.config.security.annotations.Anonymous;
-import com.backendsys.modules.log.emitter.LogStreamEmitterManager;
-import com.backendsys.modules.log.utils.LogStreamUtil;
+import com.backendsys.modules.sse.emitter.SseEmitterManager;
+import com.backendsys.modules.sse.utils.SseEmitterUTF8;
+import com.backendsys.modules.sse.utils.SseUtil;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.validation.annotation.Validated;
+import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
-import javax.validation.constraints.NotNull;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 
 @RestController
 public class LogStreamController {
 
     @Autowired
-    private LogStreamUtil logStreamUtil;
+    private SseUtil sseUtil;
 
     /**
      * [SSE] 消息监听
      */
     @Anonymous
     @GetMapping(value = "/api/log/stream/watch", produces = "text/event-stream")
-    public SseEmitter stream(@RequestParam @NotNull(message = "v 不能为空") String v) {
-
+    public SseEmitter stream() {
         String userId = Convert.toStr(1L);
-        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
-        LogStreamEmitterManager manager = LogStreamEmitterManager.getInstance();
-        manager.addEmitter(userId, emitter);
+        SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE);
+        SseEmitterManager manager = SseEmitterManager.getInstance();
+
         try {
-            emitter.send(SseEmitter.event().data("success"));
+
+            // 如果存在,则关闭
+            sseUtil.closeIfExist(userId);
+
+            // 创建新连接
+            manager.addEmitter(userId, emitter);
+            emitter.send(SseEmitter.event().data("Connected successfully! (连接成功)"));
         } catch (IOException e) {
-            // 当所有事件发送完毕后,关闭连接
+            // 关闭连接
             // emitter.complete();
-            // emitter.completeWithError(e);
             manager.emitters.remove(emitter);
         }
         return emitter;
     }
 
+
     /**
      * [SSE] 测试发送
      */
     @Anonymous
     @GetMapping("/api/log/stream/send")
     public String send() {
-        String message = "{\"message\": \"Hello World\"}";
-        logStreamUtil.send(message);
+        String message = "{\"message\": \"Hello World 中文\"}";
+        sseUtil.send(message);
         return "success";
     }
 

+ 0 - 35
src/main/java/com/backendsys/modules/log/emitter/LogStreamEmitterManager.java

@@ -1,35 +0,0 @@
-package com.backendsys.modules.log.emitter;
-
-import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-public class LogStreamEmitterManager {
-    // 单例实例
-    private static final LogStreamEmitterManager INSTANCE = new LogStreamEmitterManager();
-    // 存储SseEmitter的线程安全列表
-//    public final CopyOnWriteArrayList<Long, SseEmitter> emitters = new CopyOnWriteArrayList<>();
-    public final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
-
-    // 私有构造函数,防止外部直接实例化
-    private LogStreamEmitterManager() {}
-    // 公共静态方法,获取单例实例
-    public static LogStreamEmitterManager getInstance() {
-        return INSTANCE;
-    }
-    // 公共方法,供外部添加SseEmitter
-    public void addEmitter(String userId, SseEmitter emitter) {
-        this.emitters.put(userId, emitter);
-        emitter.onTimeout(() -> this.emitters.remove(emitter));
-        emitter.onCompletion(() -> this.emitters.remove(emitter));
-    }
-    // 公共方法,供外部移除SseEmitter
-    public SseEmitter getEmitter(String userId) {
-        // 根据用户ID获取 SseEmitter
-        return this.emitters.get(userId);
-    }
-    // 公共方法,供外部移除SseEmitter
-    public void removeEmitter(SseEmitter emitter) {
-        this.emitters.remove(emitter);
-    }
-}

+ 0 - 30
src/main/java/com/backendsys/modules/log/utils/LogStreamUtil.java

@@ -1,30 +0,0 @@
-package com.backendsys.modules.log.utils;
-
-import cn.hutool.core.convert.Convert;
-import com.backendsys.modules.log.emitter.LogStreamEmitterManager;
-import org.springframework.stereotype.Component;
-import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-
-import java.io.IOException;
-
-@Component
-public class LogStreamUtil {
-
-    // [SSE] 发送消息
-    public void send(String data) {
-
-        Long userId = 1L;
-
-        LogStreamEmitterManager manager = LogStreamEmitterManager.getInstance();
-        SseEmitter emitter = manager.getEmitter(Convert.toStr(userId));
-        if (emitter != null) {
-            try {
-                emitter.send(SseEmitter.event().data(data));
-            } catch (IOException | IllegalStateException e ) {
-                System.out.println(e.getMessage());
-                manager.removeEmitter(emitter);
-            }
-        }
-    }
-
-}

+ 9 - 5
src/main/java/com/backendsys/modules/sse/controller/SseController.java

@@ -2,6 +2,7 @@ package com.backendsys.modules.sse.controller;
 
 import cn.hutool.core.convert.Convert;
 import com.backendsys.modules.sse.emitter.SseEmitterManager;
+import com.backendsys.modules.sse.utils.SseEmitterUTF8;
 import com.backendsys.modules.sse.utils.SseUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -21,18 +22,21 @@ public class SseController {
     /**
      * [SSE] 消息监听
      */
-    @GetMapping(value = "/api/sse/stream", produces = "text/event-stream")
+    @GetMapping(value = "/api/sse/stream", produces = "text/event-stream;charset=UTF-8")
     public SseEmitter stream() {
         String userId = Convert.toStr(1L);
-        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
+        SseEmitterUTF8 emitter = new SseEmitterUTF8(Long.MAX_VALUE);
         SseEmitterManager manager = SseEmitterManager.getInstance();
         manager.addEmitter(userId, emitter);
         try {
-            emitter.send(SseEmitter.event().data("success"));
+            // 如果存在,则关闭
+            sseUtil.closeIfExist(userId);
+            // 创建新连接
+            manager.addEmitter(userId, emitter);
+            emitter.send(SseEmitter.event().data("Connected successfully! (连接成功)"));
         } catch (IOException e) {
-            // 当所有事件发送完毕后,关闭连接
+            // 关闭连接
             // emitter.complete();
-            // emitter.completeWithError(e);
             manager.emitters.remove(emitter);
         }
         return emitter;

+ 5 - 4
src/main/java/com/backendsys/modules/sse/emitter/SseEmitterManager.java

@@ -1,5 +1,6 @@
 package com.backendsys.modules.sse.emitter;
 
+import com.backendsys.modules.sse.utils.SseEmitterUTF8;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -10,7 +11,7 @@ public class SseEmitterManager {
     private static final SseEmitterManager INSTANCE = new SseEmitterManager();
     // 存储SseEmitter的线程安全列表
 //    public final CopyOnWriteArrayList<Long, SseEmitter> emitters = new CopyOnWriteArrayList<>();
-    public final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
+    public final ConcurrentHashMap<String, SseEmitterUTF8> emitters = new ConcurrentHashMap<>();
 
     // 私有构造函数,防止外部直接实例化
     private SseEmitterManager() {}
@@ -19,18 +20,18 @@ public class SseEmitterManager {
         return INSTANCE;
     }
     // 公共方法,供外部添加SseEmitter
-    public void addEmitter(String userId, SseEmitter emitter) {
+    public void addEmitter(String userId, SseEmitterUTF8 emitter) {
         this.emitters.put(userId, emitter);
         emitter.onTimeout(() -> this.emitters.remove(emitter));
         emitter.onCompletion(() -> this.emitters.remove(emitter));
     }
     // 公共方法,供外部移除SseEmitter
-    public SseEmitter getEmitter(String userId) {
+    public SseEmitterUTF8 getEmitter(String userId) {
         // 根据用户ID获取 SseEmitter
         return this.emitters.get(userId);
     }
     // 公共方法,供外部移除SseEmitter
-    public void removeEmitter(SseEmitter emitter) {
+    public void removeEmitter(SseEmitterUTF8 emitter) {
         this.emitters.remove(emitter);
     }
 }

+ 22 - 0
src/main/java/com/backendsys/modules/sse/utils/SseEmitterUTF8.java

@@ -0,0 +1,22 @@
+package com.backendsys.modules.sse.utils;
+
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.nio.charset.StandardCharsets;
+
+public class SseEmitterUTF8 extends SseEmitter {
+
+    public SseEmitterUTF8(Long timeout) {
+        super(timeout);
+    }
+
+    @Override
+    protected void extendResponse(ServerHttpResponse outputMessage) {
+        super.extendResponse(outputMessage);
+        HttpHeaders headers = outputMessage.getHeaders();
+        headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
+    }
+}

+ 20 - 4
src/main/java/com/backendsys/modules/sse/utils/SseUtil.java

@@ -2,6 +2,7 @@ package com.backendsys.modules.sse.utils;
 
 import cn.hutool.core.convert.Convert;
 import com.backendsys.modules.sse.emitter.SseEmitterManager;
+import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
@@ -12,14 +13,12 @@ public class SseUtil {
 
     // [SSE] 发送消息
     public void send(String data) {
-
         Long userId = 1L;
-
         SseEmitterManager manager = SseEmitterManager.getInstance();
-        SseEmitter emitter = manager.getEmitter(Convert.toStr(userId));
+        SseEmitterUTF8 emitter = manager.getEmitter(Convert.toStr(userId));
         if (emitter != null) {
             try {
-                emitter.send(SseEmitter.event().data(data));
+                emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));
             } catch (IOException e) {
                 System.out.println(e.getMessage());
                 manager.removeEmitter(emitter);
@@ -27,4 +26,21 @@ public class SseUtil {
         }
     }
 
+    // 如果存在,则关闭
+    public void closeIfExist(String userId) {
+        // 如果用户ID已经存在对应的连接,则关闭旧的连接
+        SseEmitterManager manager = SseEmitterManager.getInstance();
+        SseEmitterUTF8 oldEmitter = manager.getEmitter(userId);
+        if (oldEmitter != null) {
+            try {
+                oldEmitter.send(SseEmitter.event().data("Disconnected! (连接中断)", MediaType.TEXT_PLAIN));
+                oldEmitter.complete();              // 关闭旧的连接
+                manager.removeEmitter(oldEmitter);  // 从管理器中移除旧的连接
+            } catch (IOException e) {
+                System.out.println(e.getMessage());
+                manager.removeEmitter(oldEmitter);
+            }
+        }
+    }
+
 }

+ 2 - 0
src/main/resources/application.yml

@@ -46,6 +46,8 @@ mybatis-plus:
 server:
   servlet:
     context-path: /
+    encoding:
+      charset: UTF-8
 #  tomcat:
 #    threads:
 #      max: 200