OllamaUtil.java 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package com.backendsys.modules.sdk.deepseek.utils;
  2. import cn.hutool.core.convert.Convert;
  3. import cn.hutool.core.util.NumberUtil;
  4. import cn.hutool.core.util.ObjectUtil;
  5. import cn.hutool.core.util.StrUtil;
  6. import com.alibaba.fastjson.JSONObject;
  7. import com.backendsys.modules.ai.chat.entity.Chat;
  8. import com.backendsys.modules.ai.chat.entity.ChatResult;
  9. import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
  10. import com.backendsys.modules.common.config.redis.utils.RedisUtil;
  11. import com.backendsys.modules.sdk.deepseek.entity.DSRequest;
  12. import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage;
  13. import com.backendsys.modules.sse.entity.SseResponse;
  14. import com.backendsys.modules.sse.entity.SseResponseEnum;
  15. import com.backendsys.modules.sse.utils.SseUtil;
  16. import com.fasterxml.jackson.databind.ObjectMapper;
  17. import org.apache.http.client.methods.CloseableHttpResponse;
  18. import org.apache.http.client.methods.HttpPost;
  19. import org.apache.http.entity.StringEntity;
  20. import org.apache.http.impl.client.CloseableHttpClient;
  21. import org.apache.http.impl.client.HttpClients;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.beans.factory.annotation.Value;
  24. import org.springframework.stereotype.Component;
  25. import java.io.BufferedReader;
  26. import java.io.InputStreamReader;
  27. import java.nio.charset.StandardCharsets;
  28. import java.util.*;
  29. import java.util.concurrent.CompletableFuture;
  30. /**
  31. * ollama run deepseek-r1:1.5b
  32. *
  33. * Ollama API
  34. * https://github.com/ollama/ollama/blob/main/docs/api.md
  35. */
  36. @Component
  37. public class OllamaUtil {
  38. @Autowired
  39. private SseUtil sseUtil;
  40. @Autowired
  41. private RedisUtil redisUtil;
  42. @Value("${spring.application.name}")
  43. private String APPLICATION_NAME;
  44. @Value("${deepseek-r1.domain}")
  45. private String DOMAIN;
  46. /**
  47. * 流式对话
  48. */
  49. public ChatResult chatCompletion(Long user_id, String model, String prompt, String history_code, List<Chat> chatList) {
  50. Long contentDuration = 0L;
  51. ChatResult chatResult = new ChatResult();
  52. // try {
  53. System.out.println("向模型: " + model + " 提问: " + prompt);
  54. // 记录请求开始时间
  55. long allStartTime = System.currentTimeMillis();
  56. // 加入上下文历史对话
  57. System.out.println("---- 历史对话 (history_code): " + history_code + " ----");
  58. List<DSRequestMessage> messages = new ArrayList<>();
  59. if (chatList != null && !chatList.isEmpty()) {
  60. chatList.stream().forEach(chat -> {
  61. if (!"THINK".equals(chat.getContent_type())) {
  62. messages.add(new DSRequestMessage(chat.getRole(), chat.getContent()));
  63. }
  64. });
  65. // 反转列表
  66. Collections.reverse(messages);
  67. }
  68. // 新的对话内容
  69. messages.add(new DSRequestMessage("user", prompt));
  70. // 输出全部对话内容
  71. messages.stream().forEach(msg -> {
  72. System.out.println("[" + msg.getRole() + "]: " + msg.getContent());
  73. });
  74. System.out.println("---------------------------------------------------------------------");
  75. // 定义作用于全局的变量
  76. Boolean isThinking = false;
  77. StringBuilder allReplyContent = new StringBuilder();
  78. StringBuilder allThinkContent = new StringBuilder();
  79. String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
  80. ObjectMapper objectMapper = new ObjectMapper();
  81. try (CloseableHttpClient client = HttpClients.createDefault()) {
  82. /*
  83. 【/api/generate】
  84. 它是一个相对基础的文本生成端点,主要用于根据给定的提示信息生成一段连续的文本。
  85. 这个端点会基于输入的提示,按照模型的语言生成能力输出一段完整的内容,更侧重于单纯的文本生成任务。
  86. 生成过程不依赖于上下文的历史对话信息,每次请求都是独立的,模型仅依据当前输入的提示进行文本生成。
  87. {
  88. "model": "llama2", "prompt": "请描述一下美丽的海滩", "num_predict": 200, "temperature": 0.7
  89. }
  90. 【/api/chat】
  91. 该端点专为模拟聊天场景设计,具备处理对话上下文的能力。它可以跟踪对话的历史记录,理解对话的上下文信息,从而生成更符合对话逻辑和连贯性的回复。
  92. 更注重模拟真实的人机对话交互,能够根据历史对话和当前输入生成合适的回应,适用于构建聊天机器人等交互式应用。
  93. {
  94. "model": "deepseek-r1:1.5b",
  95. "messages": [
  96. {
  97. "role": "system",
  98. "content": "你是一个能够理解中文指令并帮助完成任务的智能助手。你的任务是根据用户的需求生成合适的分类任务或生成任务,并准确判断这些任务的类型。请确保你的回答简洁、准确且符合中英文语境。"
  99. },
  100. {
  101. "role": "user",
  102. "content": "写一个简单的 Python 函数,用于计算两个数的和"
  103. }
  104. ],
  105. "stream": false
  106. }
  107. */
  108. // [Chat] 构建请求体
  109. HttpPost request = new HttpPost(DOMAIN + "/api/chat");
  110. DSRequest body = new DSRequest();
  111. body.setModel(model);
  112. body.setMessages(messages);
  113. body.setStream(true);
  114. String requestBody = objectMapper.writeValueAsString(body);
  115. // // [Generate] 构建请求体
  116. // HttpPost request = new HttpPost(DOMAIN + "/api/generate");
  117. // Map<String, Object> requestMap = new HashMap<>();
  118. // requestMap.put("model", model);
  119. // requestMap.put("prompt", prompt);
  120. // requestMap.put("stream", true);
  121. // String requestBody = objectMapper.writeValueAsString(requestMap);
  122. request.setEntity(new StringEntity(requestBody, StandardCharsets.UTF_8));
  123. try (CloseableHttpResponse response = client.execute(request);
  124. BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
  125. long apiDuration = System.currentTimeMillis() - allStartTime; // 接口耗时
  126. long thinkStartTime = 0L; // 开始思考时间
  127. long thinkDuration = 0L; // 思考耗时
  128. System.out.println("API 调用耗时: " + apiDuration + " 毫秒");
  129. System.out.println("---- 开始流式回答: ------------------------------------");
  130. // [SSE] 发送消息
  131. ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code);
  132. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatLoadingSseMessage).toJsonStr());
  133. String line;
  134. while ((line = reader.readLine()) != null) {
  135. // 判断是否中止
  136. if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
  137. System.out.println("中止!");
  138. request.abort();
  139. // 流程结束后,删除锁
  140. redisUtil.delete(requestOfRedisKey);
  141. break;
  142. }
  143. // System.out.println(line);
  144. /*
  145. ---------------------- [Chat] line ----------------------
  146. {"model":"deepseek-r1:1.5b","created_at":"2025-03-18T07:37:06.483163789Z","message":{"role":"assistant","content":"\u003cthink\u003e"},"done":false}
  147. ---------------------- [Generate] line ------------------
  148. {"model":"deepseek-r1:1.5b","created_at":"2025-03-05T10:51:17.443189986Z","response":"\u003cthink\u003e","done":false}
  149. {"model":"deepseek-r1:1.5b","created_at":"2025-03-06T11:08:30.9219611Z","response":"\n\n","done":false}
  150. ---------------------- [Error] line ---------------------
  151. {"error":"llama runner process has terminated: error loading model: unable to allocate CUDA0 buffer"}
  152. */
  153. // 每行数据可以是一个JSON对象,根据实际情况处理
  154. JSONObject resJson = JSONObject.parseObject(line);
  155. // -- 判断内容是否为空 (或报错) --------------------------------------
  156. if (resJson == null) return null;
  157. String errJsonMessage = resJson.getString("error");
  158. if (errJsonMessage != null) {
  159. // [SSE] 发送消息
  160. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, new ChatSseMessage("REPLY", errJsonMessage, contentDuration, history_code)).toJsonStr());
  161. // [SSE] 发送消息 (完成)
  162. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code)).toJsonStr());
  163. //
  164. chatResult.setContent(errJsonMessage);
  165. return chatResult;
  166. }
  167. // --------------------------------------------------------------
  168. // [Chat]
  169. JSONObject resJsonMessage = resJson.getJSONObject("message");
  170. String content = resJsonMessage.getString("content");
  171. // // [Generate]
  172. // String content = resJson.getString("response");
  173. // --------------------------------------------------------------
  174. // System.out.println("content: " + content);
  175. // content: \n\n
  176. // content: <think>
  177. // content: </think>
  178. // 开始思考
  179. if (content.contains("<think>")) {
  180. isThinking = true;
  181. thinkStartTime = System.currentTimeMillis();
  182. }
  183. // 停止思考,并计算思考耗时
  184. if (content.contains("</think>")) {
  185. isThinking = false;
  186. thinkDuration = thinkStartTime - allStartTime;
  187. System.out.println("推理耗时: " + thinkDuration + "毫秒");
  188. System.out.println("-----------------------------------------------------");
  189. if (allThinkContent.length() > 0){
  190. // [SSE] 发送消息
  191. ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code);
  192. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  193. }
  194. }
  195. // [思考] Think
  196. if (isThinking) {
  197. if (!content.contains("<think>") && !content.contains("\n\n") && !content.contains("\n")) {
  198. // [SSE] 发送消息
  199. ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", content, null, history_code);
  200. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  201. // 收集推理内容
  202. allThinkContent.append(content);
  203. }
  204. }
  205. // [回答] Reply
  206. if (!isThinking) {
  207. // System.out.println("content: " + content);
  208. if (!content.contains("</think>") && !content.contains("\n\n")) {
  209. Boolean done = resJson.getBoolean("done");
  210. if (!done) {
  211. // [SSE] 发送消息
  212. ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content, null, history_code);
  213. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  214. // 收集回答内容
  215. allReplyContent.append(content);
  216. }
  217. }
  218. }
  219. }
  220. System.out.println("-------------------- 结束流式回答. --------------------");
  221. contentDuration = System.currentTimeMillis() - allStartTime;
  222. System.out.println("全部推理: " + allThinkContent);
  223. System.out.println("全部回答: " + allReplyContent);
  224. System.out.println("总输出耗时: " + contentDuration + " 毫秒");
  225. System.out.println("-----");
  226. // System.out.println("Think content: " + allThinkContent.toString());
  227. // System.out.println("Think content length: " + allThinkContent.toString().length());
  228. // System.out.println("Think content is not empty: " + StrUtil.isNotEmpty(allThinkContent.toString()));
  229. // System.out.println("-----");
  230. if (StrUtil.isNotEmpty(allThinkContent.toString())) {
  231. chatResult.setReasoning_content(allThinkContent.toString());
  232. chatResult.setReasoning_duration(thinkDuration);
  233. }
  234. chatResult.setContent(allReplyContent.toString());
  235. chatResult.setContent_duration(contentDuration);
  236. // [SSE] 发送消息 (完成)
  237. ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code);
  238. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  239. return chatResult;
  240. } catch (Exception e) {
  241. System.out.println("Exception(1): " + e.getMessage());
  242. String message = e.getMessage();
  243. if (message.contains("failed to respond")) {
  244. message = "(系统繁忙,请稍后再试)";
  245. }
  246. if (message.contains("Premature end of chunk coded message body: closing chunk expected")) {
  247. message = "(请求中止)";
  248. }
  249. // [SSE] 发送消息
  250. String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT");
  251. ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, message, contentDuration, history_code);
  252. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  253. // chatResult.setContent(e.getMessage());
  254. // 由于中止导致的错误信息叠加 (一并保存进数据库)
  255. if (StrUtil.isNotEmpty(allThinkContent.toString())) {
  256. if (isThinking) {
  257. chatResult.setReasoning_content(allThinkContent.toString() + " " + message);
  258. } else {
  259. chatResult.setReasoning_content(allThinkContent.toString());
  260. }
  261. }
  262. chatResult.setContent(allReplyContent.toString() + " " + message);
  263. redisUtil.delete(requestOfRedisKey);
  264. return chatResult;
  265. // return chatResult;
  266. }
  267. } catch (Exception e) {
  268. System.out.println("Exception(2): " + e.getMessage());
  269. // [SSE] 发送消息
  270. String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT");
  271. ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), contentDuration, history_code);
  272. sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  273. redisUtil.delete(requestOfRedisKey);
  274. chatResult.setContent(e.getMessage());
  275. return chatResult;
  276. }
  277. // } catch (Exception e) {
  278. // System.out.println("Exception(3): " + e.getMessage());
  279. // // [SSE] 发送消息
  280. // ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), contentDuration);
  281. // sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  282. //
  283. // chatResult.setContent(e.getMessage());
  284. // return chatResult;
  285. // }
  286. // } finally {
  287. // System.out.println("Finally.");
  288. // // [SSE] 发送消息
  289. // ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration);
  290. // sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
  291. // }
  292. }
  293. }