Browse Source

Debug abort

tsurumure 5 months ago
parent
commit
4b9c1512af

+ 58 - 19
src/main/java/com/backendsys/modules/sdk/deepseek/service/impl/DeepSeekClientImpl.java

@@ -1,5 +1,7 @@
 package com.backendsys.modules.sdk.deepseek.service.impl;
 
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
@@ -7,6 +9,7 @@ import com.backendsys.exception.CustException;
 import com.backendsys.modules.ai.chat.entity.Chat;
 import com.backendsys.modules.ai.chat.entity.ChatResult;
 import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
+import com.backendsys.modules.common.config.redis.utils.RedisUtil;
 import com.backendsys.modules.sdk.deepseek.entity.DSRequest;
 import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage;
 import com.backendsys.modules.sdk.deepseek.service.DeepSeekClient;
@@ -45,8 +48,14 @@ public class DeepSeekClientImpl implements DeepSeekClient {
     private String API_URL;
     @Value("${deepseek-api.api-key}")
     private String API_KEY;
+
+    @Value("${spring.application.name}")
+    private String APPLICATION_NAME;
+
     @Autowired
     private SseUtil sseUtil;
+    @Autowired
+    private RedisUtil redisUtil;
 
     /**
      * [DeepSeek] 发起对话
@@ -55,7 +64,12 @@ public class DeepSeekClientImpl implements DeepSeekClient {
     @Override
     public ChatResult chatCompletion(Long user_id, String model, String prompt, String history_code, List<Chat> chatList) {
 
-        long replyDuration = 0L;
+        // 定义作用于全局的变量
+        Long replyDuration = 0L;
+        Boolean isThinking = false;
+        String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
+        StringBuilder allReplyContent = new StringBuilder();
+        StringBuilder allThinkContent = new StringBuilder();
 
         ChatResult chatResult = new ChatResult();
         try {
@@ -121,21 +135,29 @@ public class DeepSeekClientImpl implements DeepSeekClient {
                     
                     long thinkStartTime = 0L;                            // 开始思考时间
                     long thinkDuration = 0L;                             // 思考耗时
-                    Boolean isThinking = false;
+
 
                     System.out.println("API 调用耗时: " + apiDuration + " 毫秒");
                     System.out.println("---- 开始流式回答: ------------------------------------");
 
                     // [SSE] 发送消息
-                    ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考");
+                    ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code);
                     sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatLoadingSseMessage).toJsonStr());
 
-                    StringBuilder allReplyContent = new StringBuilder();
-                    StringBuilder allThinkContent = new StringBuilder();
+
 
                     String line;
                     while ((line = reader.readLine()) != null) {
 
+                        // 判断是否中止
+                        if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
+                            System.out.println("中止!");
+                            request.abort();
+                            // 流程结束后,删除锁
+                            redisUtil.delete(requestOfRedisKey);
+                            break;
+                        }
+
                         // System.out.println(line);
                         /*
                             ---------------------- line ----------------------
@@ -172,7 +194,7 @@ public class DeepSeekClientImpl implements DeepSeekClient {
                                 System.out.println("reasoning_content: " + reasoning_content);
 
                                 // [SSE] 发送消息
-                                ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", reasoning_content);
+                                ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", reasoning_content, null, history_code);
                                 sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
 
                                 // 收集推理内容
@@ -193,13 +215,13 @@ public class DeepSeekClientImpl implements DeepSeekClient {
                                     System.out.println("-----------------------------------------------");
 
                                     // [SSE] 发送消息
-                                    ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration);
+                                    ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code);
                                     sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
 
                                 }
 
                                 // [SSE] 发送消息
-                                ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content);
+                                ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content, null, history_code);
                                 sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
 
                                 // 收集回答内容
@@ -217,38 +239,55 @@ public class DeepSeekClientImpl implements DeepSeekClient {
                     System.out.println("全部回答: " + allReplyContent);
                     System.out.println("总输出耗时: " + replyDuration + " 毫秒");
 
-//                    // [SSE] 发送消息
-//                    ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", replyDuration);
-//                    sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
-
                     chatResult.setReasoning_content(allThinkContent.toString());
                     chatResult.setReasoning_duration(thinkDuration);
                     chatResult.setContent(allReplyContent.toString());
                     chatResult.setContent_duration(replyDuration);
+
+                    redisUtil.delete(requestOfRedisKey);
+
                     return chatResult;
 
                 }
             } catch (Exception e) {
-                System.out.println(e.getMessage());
+                System.out.println("Exception(1): " + e.getMessage());
+                String message = e.getMessage();
+                if (message.contains("Premature end of chunk coded message body: closing chunk expected")) {
+                    message = "(请求中止)";
+                }
                 // [SSE] 发送消息
-                ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration);
+                String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT");
+                ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), replyDuration, history_code);
                 sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
+                redisUtil.delete(requestOfRedisKey);
+
+                // chatResult.setContent(e.getMessage());
+                // 由于中止导致的错误信息叠加 (一并保存进数据库)
+                if (StrUtil.isNotEmpty(allThinkContent.toString())) {
+                    if (isThinking) {
+                        chatResult.setReasoning_content(allThinkContent.toString() + " " + message);
+                    } else {
+                        chatResult.setReasoning_content(allThinkContent.toString());
+                    }
+                }
+                chatResult.setContent(allReplyContent.toString() + " " + message);
 
-                chatResult.setContent(e.getMessage());
                 return chatResult;
             }
         } catch (Exception e) {
-            System.out.println(e.getMessage());
+            System.out.println("Exception(2): " + e.getMessage());
             // [SSE] 发送消息
-            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration);
+            String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT");
+            ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), replyDuration, history_code);
             sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
-
+            redisUtil.delete(requestOfRedisKey);
             chatResult.setContent(e.getMessage());
             return chatResult;
         } finally {
             // [SSE] 发送消息
-            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", replyDuration);
+            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", replyDuration, history_code);
             sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
+            redisUtil.delete(requestOfRedisKey);
         }
 
     }

+ 11 - 12
src/main/java/com/backendsys/modules/sdk/deepseek/utils/OllamaUtil.java

@@ -55,7 +55,12 @@ public class OllamaUtil {
      */
     public ChatResult chatCompletion(Long user_id, String model, String prompt, String history_code, List<Chat> chatList) {
 
+        // 定义作用于全局的变量
         Long contentDuration = 0L;
+        Boolean isThinking = false;
+        StringBuilder allReplyContent = new StringBuilder();
+        StringBuilder allThinkContent = new StringBuilder();
+        String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
 
         ChatResult chatResult = new ChatResult();
 //        try {
@@ -87,12 +92,6 @@ public class OllamaUtil {
             });
             System.out.println("---------------------------------------------------------------------");
 
-            // 定义作用于全局的变量
-            Boolean isThinking = false;
-            StringBuilder allReplyContent = new StringBuilder();
-            StringBuilder allThinkContent = new StringBuilder();
-            String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
-
             ObjectMapper objectMapper = new ObjectMapper();
             try (CloseableHttpClient client = HttpClients.createDefault()) {
 
@@ -162,7 +161,6 @@ public class OllamaUtil {
                     String line;
                     while ((line = reader.readLine()) != null) {
 
-
                         // 判断是否中止
                         if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
                             System.out.println("中止!");
@@ -290,11 +288,6 @@ public class OllamaUtil {
                     }
                     chatResult.setContent(allReplyContent.toString());
                     chatResult.setContent_duration(contentDuration);
-
-                    // [SSE] 发送消息 (完成)
-                    ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code);
-                    sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
-
                     return chatResult;
 
                 } catch (Exception e) {
@@ -340,6 +333,12 @@ public class OllamaUtil {
 
                 chatResult.setContent(e.getMessage());
                 return chatResult;
+            } finally {
+
+                // [SSE] 发送消息 (完成)
+                ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code);
+                sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
+
             }
 
 

+ 30 - 9
src/main/java/com/backendsys/modules/sdk/tencentcloud/huanyuan/service/impl/HunYuanClientImpl.java

@@ -1,9 +1,11 @@
 package com.backendsys.modules.sdk.tencentcloud.huanyuan.service.impl;
 
 import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.ObjectUtil;
 import com.backendsys.modules.ai.chat.entity.Chat;
 import com.backendsys.modules.ai.chat.entity.ChatResult;
 import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
+import com.backendsys.modules.common.config.redis.utils.RedisUtil;
 import com.backendsys.modules.common.config.security.utils.SecurityUtil;
 import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage;
 import com.backendsys.modules.sdk.tencentcloud.huanyuan.service.HunYuanClient;
@@ -42,8 +44,13 @@ public class HunYuanClientImpl implements HunYuanClient {
     @Value("${tencent.hunyuan.region}")
     private String REGION;
 
+    @Value("${spring.application.name}")
+    private String APPLICATION_NAME;
+
     @Autowired
     private SseUtil sseUtil;
+    @Autowired
+    private RedisUtil redisUtil;
 
     private Message setMessage(String role, String content) {
         Message msg = new Message();
@@ -59,7 +66,10 @@ public class HunYuanClientImpl implements HunYuanClient {
     @Override
     public ChatResult chatCompletion(Long user_id, String prompt, String history_code, List<Chat> chatList) {
 
-        long replyDuration = 0L;
+        // 定义作用于全局的变量
+        Long replyDuration = 0L;
+        String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
+        StringBuilder allReplyContent = new StringBuilder();
 
         ChatResult chatResult = new ChatResult();
         try {
@@ -105,9 +115,10 @@ public class HunYuanClientImpl implements HunYuanClient {
 //            ChatProResponse resp = client.ChatPro(req);     // 发送对话 (专业版)
 
 
-            StringBuilder allReplyContent = new StringBuilder();
             for (SSEResponseModel.SSE e : resp) {
 
+
+
 //                 System.out.println(e.Data);
                 /**
                  *
@@ -120,20 +131,28 @@ public class HunYuanClientImpl implements HunYuanClient {
                  * e.Data:
                  */
 
-//                JSONObject dataObject = JSONUtil.parseObj(e.Data);
-
                 ObjectMapper objectMapper = new ObjectMapper();
                 JsonNode node = objectMapper.readTree(e.Data.toString());
                 JsonNode delta = node.path("Choices").path(0).path("Delta");
                 String content = delta.path("Content").asText("");
 
                 // [SSE] 发送消息
-                ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content);
+                ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content, null, history_code);
                 sseUtil.send(user_id, new SseResponse(SseResponseEnum.HUNYUAN, chatSseMessage).toJsonStr());
 
                 // 收集回答内容
                 allReplyContent.append(content);
 
+                // 判断是否中止
+                if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
+                    System.out.println("中止!");
+                    allReplyContent.append(" (请求中止)");
+//                    request.abort();
+                    // 流程结束后,删除锁
+                    redisUtil.delete(requestOfRedisKey);
+                    break;
+                }
+
             }
 
             System.out.println("-------------------- 结束流式回答. --------------------");
@@ -149,7 +168,7 @@ public class HunYuanClientImpl implements HunYuanClient {
         } catch (TencentCloudSDKException e) {
             System.out.println("TencentCloudSDKException: " + e.getMessage());
             // [SSE] 发送消息
-            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration);
+            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration, history_code);
             sseUtil.send(user_id, new SseResponse(SseResponseEnum.HUNYUAN, chatSseMessage).toJsonStr());
 
             chatResult.setContent(e.getMessage());
@@ -157,7 +176,7 @@ public class HunYuanClientImpl implements HunYuanClient {
         } catch (JsonMappingException e) {
             System.out.println("JsonMappingException: " + e.getMessage());
             // [SSE] 发送消息
-            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration);
+            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration, history_code);
             sseUtil.send(user_id, new SseResponse(SseResponseEnum.HUNYUAN, chatSseMessage).toJsonStr());
 
             chatResult.setContent(e.getMessage());
@@ -165,15 +184,17 @@ public class HunYuanClientImpl implements HunYuanClient {
         } catch (JsonProcessingException e) {
             System.out.println("JsonProcessingException: " + e.getMessage());
             // [SSE] 发送消息
-            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration);
+            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), replyDuration, history_code);
             sseUtil.send(user_id, new SseResponse(SseResponseEnum.HUNYUAN, chatSseMessage).toJsonStr());
 
             chatResult.setContent(e.getMessage());
             return chatResult;
         } finally {
             // [SSE] 发送消息
-            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", replyDuration);
+            ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", replyDuration, history_code);
             sseUtil.send(user_id, new SseResponse(SseResponseEnum.HUNYUAN, chatSseMessage).toJsonStr());
+
+            redisUtil.delete(requestOfRedisKey);
         }
 
     }