OllamaUtil.java 14 KB


  1. package com.backendsys.modules.sdk.deepseek.utils;
  2. import cn.hutool.core.util.ObjectUtil;
  3. import cn.hutool.core.util.StrUtil;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.backendsys.modules.ai.chat.entity.Chat;
  6. import com.backendsys.modules.ai.chat.entity.ChatCompletionParam;
  7. import com.backendsys.modules.ai.chat.entity.ChatResult;
  8. import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
  9. import com.backendsys.modules.common.config.redis.utils.RedisUtil;
  10. import com.backendsys.modules.sdk.bocha.entity.BochaParam;
  11. import com.backendsys.modules.sdk.bocha.service.BochaService;
  12. import com.backendsys.modules.sdk.deepseek.entity.DSRequest;
  13. import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage;
  14. import com.backendsys.modules.sse.entity.SseResponse;
  15. import com.backendsys.modules.sse.entity.SseResponseEnum;
  16. import com.backendsys.modules.sse.utils.SseUtil;
  17. import com.fasterxml.jackson.databind.JsonNode;
  18. import com.fasterxml.jackson.databind.ObjectMapper;
  19. //import org.apache.http.client.methods.CloseableHttpResponse;
  20. //import org.apache.http.impl.client.CloseableHttpClient;
  21. //import org.apache.http.impl.client.HttpClients;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.beans.factory.annotation.Value;
  24. import org.springframework.stereotype.Component;
  25. import java.net.URI;
  26. import java.net.http.HttpClient;
  27. import java.net.http.HttpRequest;
  28. import java.net.http.HttpResponse;
  29. import java.nio.charset.StandardCharsets;
  30. import java.time.Duration;
  31. import java.util.*;
  32. import java.util.concurrent.CompletableFuture;
  33. import java.util.concurrent.atomic.AtomicReference;
  34. /**
  35. * ollama run deepseek-r1:1.5b
  36. *
  37. * Ollama API
  38. * https://github.com/ollama/ollama/blob/main/docs/api.md
  39. */
  40. @Component
  41. public class OllamaUtil {
  42. @Autowired
  43. private SseUtil sseUtil;
  44. @Autowired
  45. private RedisUtil redisUtil;
  46. @Autowired
  47. private BochaService bochaService;
  48. @Value("${spring.application.name}")
  49. private String APPLICATION_NAME;
  50. @Value("${deepseek-r1.domain}")
  51. private String DOMAIN;
  52. /**
  53. * 流式对话
  54. */
  55. public ChatResult chatCompletion(ChatCompletionParam chatCompletionParam) {
  56. // 参数化
  57. Long user_id = chatCompletionParam.getUser_id();
  58. String model = chatCompletionParam.getModel();
  59. String prompt = chatCompletionParam.getPrompt();
  60. String history_code = chatCompletionParam.getHistory_code();
  61. List<Chat> chatList = chatCompletionParam.getChatList();
  62. Boolean internet = chatCompletionParam.getInternet();
  63. // 定义作用于全局的变量
  64. Long contentDuration = 0L;
  65. AtomicReference<Boolean> isThinking = new AtomicReference<>(false);
  66. StringBuilder allReplyContent = new StringBuilder();
  67. StringBuilder allThinkContent = new StringBuilder();
  68. String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
  69. ChatResult chatResult = new ChatResult();
  70. // try {
  71. System.out.println("向模型: " + model + " 提问: " + prompt);
  72. // 记录请求开始时间
  73. long allStartTime = System.currentTimeMillis();
  74. // 加入上下文历史对话
  75. System.out.println("---- 历史对话 (history_code): " + history_code + " ----");
  76. List<DSRequestMessage> messages = new ArrayList<>();
  77. if (chatList != null && !chatList.isEmpty()) {
  78. chatList.stream().forEach(chat -> {
  79. if (!"THINK".equals(chat.getContent_type())) {
  80. messages.add(new DSRequestMessage(chat.getRole(), chat.getContent()));
  81. }
  82. });
  83. // 反转列表
  84. Collections.reverse(messages);
  85. }
  86. // 【要把搜索到的内容塞到 'user' 里?】
  87. // -- [博查] Web Search API ----------------------------------------------
  88. if (internet) {
  89. // 远程查询、统计接口时间、设置返回参数
  90. long internetStartTime = System.currentTimeMillis();
  91. JsonNode searchResult = bochaService.WebSearch(new BochaParam(prompt));
  92. String context = bochaService.WebSearchToString(searchResult);
  93. context = context.replace("\n", "\\n");
  94. long internetEndTime = System.currentTimeMillis();
  95. chatResult.setInternet_duration(internetStartTime - internetEndTime);
  96. chatResult.setInternet_content(context);
  97. // 将搜索结果作为上下文添加到消息中
  98. messages.add(new DSRequestMessage("system", context));
  99. messages.add(new DSRequestMessage("user", "在回答时引用以上全部数据进行分析")); // 的 "name"、"summary"
  100. messages.add(new DSRequestMessage("assistant", "好的"));
  101. // [SSE] 发送消息
  102. ChatSseMessage chatSearchSseMessage = new ChatSseMessage("SEARCH", context, null, history_code);
  103. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSearchSseMessage).toJsonStr());
  104. }
  105. // -----------------------------------------------------------------------
  106. // 新的对话内容
  107. messages.add(new DSRequestMessage("user", prompt));
  108. // 输出全部对话内容
  109. messages.stream().forEach(msg -> {
  110. System.out.println("[" + msg.getRole() + "]: " + msg.getContent());
  111. });
  112. System.out.println("---------------------------------------------------------------------");
  113. /*
  114. 【/api/generate】
  115. 它是一个相对基础的文本生成端点,主要用于根据给定的提示信息生成一段连续的文本。
  116. 这个端点会基于输入的提示,按照模型的语言生成能力输出一段完整的内容,更侧重于单纯的文本生成任务。
  117. 生成过程不依赖于上下文的历史对话信息,每次请求都是独立的,模型仅依据当前输入的提示进行文本生成。
  118. {
  119. "model": "llama2", "prompt": "请描述一下美丽的海滩", "num_predict": 200, "temperature": 0.7
  120. }
  121. 【/api/chat】
  122. 该端点专为模拟聊天场景设计,具备处理对话上下文的能力。它可以跟踪对话的历史记录,理解对话的上下文信息,从而生成更符合对话逻辑和连贯性的回复。
  123. 更注重模拟真实的人机对话交互,能够根据历史对话和当前输入生成合适的回应,适用于构建聊天机器人等交互式应用。
  124. {
  125. "model": "deepseek-r1:1.5b",
  126. "messages": [
  127. { "role": "system", "content": "你是一个能够理解中文指令并帮助完成任务的智能助手。" },
  128. { "role": "user", "content": "写一个简单的 Python 函数,用于计算两个数的和" }
  129. ],
  130. "stream": false,
  131. // 新增
  132. "context": ["引用1","引用2"]
  133. }
  134. */
  135. try {
  136. ObjectMapper objectMapper = new ObjectMapper();
  137. HttpClient httpClient = HttpClient.newBuilder()
  138. .connectTimeout(Duration.ofSeconds(30))
  139. .build();
  140. DSRequest body = new DSRequest();
  141. body.setModel(model);
  142. body.setMessages(messages);
  143. body.setStream(true);
  144. String requestBody = objectMapper.writeValueAsString(body);
  145. HttpRequest request = HttpRequest.newBuilder()
  146. .uri(URI.create(DOMAIN + "/api/chat"))
  147. .header("Content-Type", "application/json")
  148. .POST(HttpRequest.BodyPublishers.ofString(requestBody, StandardCharsets.UTF_8))
  149. .build();
  150. long apiDuration = System.currentTimeMillis() - allStartTime; // 接口耗时
  151. System.out.println("API 调用耗时: " + apiDuration + " 毫秒");
  152. System.out.println("---- 开始流式回答: ------------------------------------");
  153. // [SSE] 发送消息
  154. ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code);
  155. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatLoadingSseMessage).toJsonStr());
  156. // 使用异步流式处理
  157. Long finalContentDuration = contentDuration;
  158. CompletableFuture<Void> future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
  159. .thenAccept(resp -> {
  160. resp.body().forEach(line -> {
  161. if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
  162. System.out.println("中止!");
  163. redisUtil.delete(requestOfRedisKey);
  164. return;
  165. }
  166. JSONObject resJson = JSONObject.parseObject(line);
  167. if (resJson == null) return;
  168. String errJsonMessage = resJson.getString("error");
  169. if (errJsonMessage != null) {
  170. ChatSseMessage errMsg = new ChatSseMessage("REPLY", errJsonMessage, finalContentDuration, history_code);
  171. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, errMsg).toJsonStr());
  172. sseUtil.send(user_id,
  173. new SseResponse(SseResponseEnum.OLLAMA,
  174. new ChatSseMessage("REPLY", "[DONE][REPLY]", finalContentDuration, history_code)).toJsonStr());
  175. chatResult.setContent(errJsonMessage);
  176. return;
  177. }
  178. JSONObject resJsonMessage = resJson.getJSONObject("message");
  179. String content = resJsonMessage.getString("content");
  180. // 思考标记
  181. if (content.contains("<think>")) {
  182. isThinking.set(true);
  183. }
  184. if (content.contains("</think>")) {
  185. isThinking.set(false);
  186. long thinkDuration = System.currentTimeMillis() - allStartTime;
  187. if (allThinkContent.length() > 0) {
  188. ChatSseMessage thinkMsg = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code);
  189. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, thinkMsg).toJsonStr());
  190. }
  191. }
  192. if (isThinking.get() && !content.contains("<think>") && !content.equals("\n\n") && !content.equals("\n")) {
  193. ChatSseMessage thinkMsg = new ChatSseMessage("THINK", content, null, history_code);
  194. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, thinkMsg).toJsonStr());
  195. allThinkContent.append(content);
  196. }
  197. if (!isThinking.get() && !content.contains("</think>") && !content.equals("\n\n")) {
  198. Boolean done = resJson.getBoolean("done");
  199. if (!done) {
  200. ChatSseMessage replyMsg = new ChatSseMessage("REPLY", content, null, history_code);
  201. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, replyMsg).toJsonStr());
  202. allReplyContent.append(content);
  203. }
  204. }
  205. });
  206. });
  207. future.join(); // 阻塞直至流结束
  208. System.out.println("-------------------- 结束流式回答. --------------------");
  209. contentDuration = System.currentTimeMillis() - allStartTime;
  210. System.out.println("全部推理: " + allThinkContent);
  211. System.out.println("全部回答: " + allReplyContent);
  212. System.out.println("总输出耗时: " + contentDuration + " 毫秒");
  213. System.out.println("---------------------------------------------------");
  214. if (StrUtil.isNotEmpty(allThinkContent.toString())) {
  215. chatResult.setReasoning_content(allThinkContent.toString());
  216. chatResult.setReasoning_duration(System.currentTimeMillis() - allStartTime);
  217. }
  218. chatResult.setContent(allReplyContent.toString());
  219. chatResult.setContent_duration(contentDuration);
  220. return chatResult;
  221. } catch (Exception e) {
  222. System.out.println("Exception(2): " + e.getMessage());
  223. // [SSE] 发送消息
  224. String contentType = (isThinking.get() ? "THINK_ABORT" : "REPLY_ABORT");
  225. ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), contentDuration, history_code);
  226. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSseMessage).toJsonStr());
  227. redisUtil.delete(requestOfRedisKey);
  228. chatResult.setContent(e.getMessage());
  229. return chatResult;
  230. } finally {
  231. // [SSE] 发送消息 (完成)
  232. ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code);
  233. sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSseMessage).toJsonStr());
  234. }
  235. }
  236. }