package com.backendsys.modules.sdk.deepseek.utils; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.backendsys.modules.ai.chat.entity.Chat; import com.backendsys.modules.ai.chat.entity.ChatCompletionParam; import com.backendsys.modules.ai.chat.entity.ChatResult; import com.backendsys.modules.ai.chat.entity.ChatSseMessage; import com.backendsys.modules.common.config.redis.utils.RedisUtil; import com.backendsys.modules.sdk.bocha.entity.BochaParam; import com.backendsys.modules.sdk.bocha.service.BochaService; import com.backendsys.modules.sdk.deepseek.entity.DSRequest; import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage; import com.backendsys.modules.sse.entity.SseResponse; import com.backendsys.modules.sse.entity.SseResponseEnum; import com.backendsys.modules.sse.utils.SseUtil; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; //import org.apache.http.client.methods.CloseableHttpResponse; //import org.apache.http.impl.client.CloseableHttpClient; //import org.apache.http.impl.client.HttpClients; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; /** * ollama run deepseek-r1:1.5b * * Ollama API * https://github.com/ollama/ollama/blob/main/docs/api.md */ @Component public class OllamaUtil { @Autowired private SseUtil sseUtil; @Autowired private RedisUtil redisUtil; @Autowired private BochaService bochaService; @Value("${spring.application.name}") private String APPLICATION_NAME; @Value("${deepseek-r1.domain}") private String DOMAIN; /** * 流式对话 */ public ChatResult chatCompletion(ChatCompletionParam chatCompletionParam) { // 参数化 Long user_id = chatCompletionParam.getUser_id(); String model = chatCompletionParam.getModel(); String prompt = chatCompletionParam.getPrompt(); String history_code = chatCompletionParam.getHistory_code(); List chatList = chatCompletionParam.getChatList(); Boolean internet = chatCompletionParam.getInternet(); // 定义作用于全局的变量 Long contentDuration = 0L; AtomicReference isThinking = new AtomicReference<>(false); StringBuilder allReplyContent = new StringBuilder(); StringBuilder allThinkContent = new StringBuilder(); String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code; ChatResult chatResult = new ChatResult(); // try { System.out.println("向模型: " + model + " 提问: " + prompt); // 记录请求开始时间 long allStartTime = System.currentTimeMillis(); // 加入上下文历史对话 System.out.println("---- 历史对话 (history_code): " + history_code + " ----"); List messages = new ArrayList<>(); if (chatList != null && !chatList.isEmpty()) { chatList.stream().forEach(chat -> { if (!"THINK".equals(chat.getContent_type())) { messages.add(new DSRequestMessage(chat.getRole(), chat.getContent())); } }); // 反转列表 Collections.reverse(messages); } // 【要把搜索到的内容塞到 'user' 里?】 // -- [博查] Web Search API ---------------------------------------------- if (internet) { // 远程查询、统计接口时间、设置返回参数 long internetStartTime = System.currentTimeMillis(); JsonNode searchResult = bochaService.WebSearch(new BochaParam(prompt)); String context = bochaService.WebSearchToString(searchResult); context = context.replace("\n", "\\n"); long internetEndTime = System.currentTimeMillis(); chatResult.setInternet_duration(internetStartTime - internetEndTime); chatResult.setInternet_content(context); // 将搜索结果作为上下文添加到消息中 messages.add(new DSRequestMessage("system", context)); messages.add(new DSRequestMessage("user", "在回答时引用以上全部数据进行分析")); // 的 "name"、"summary" messages.add(new DSRequestMessage("assistant", "好的")); // [SSE] 发送消息 ChatSseMessage chatSearchSseMessage = new ChatSseMessage("SEARCH", context, null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSearchSseMessage).toJsonStr()); } // ----------------------------------------------------------------------- // 新的对话内容 messages.add(new DSRequestMessage("user", prompt)); // 输出全部对话内容 messages.stream().forEach(msg -> { System.out.println("[" + msg.getRole() + "]: " + msg.getContent()); }); System.out.println("---------------------------------------------------------------------"); /* 【/api/generate】 它是一个相对基础的文本生成端点,主要用于根据给定的提示信息生成一段连续的文本。 这个端点会基于输入的提示,按照模型的语言生成能力输出一段完整的内容,更侧重于单纯的文本生成任务。 生成过程不依赖于上下文的历史对话信息,每次请求都是独立的,模型仅依据当前输入的提示进行文本生成。 { "model": "llama2", "prompt": "请描述一下美丽的海滩", "num_predict": 200, "temperature": 0.7 } 【/api/chat】 该端点专为模拟聊天场景设计,具备处理对话上下文的能力。它可以跟踪对话的历史记录,理解对话的上下文信息,从而生成更符合对话逻辑和连贯性的回复。 更注重模拟真实的人机对话交互,能够根据历史对话和当前输入生成合适的回应,适用于构建聊天机器人等交互式应用。 { "model": "deepseek-r1:1.5b", "messages": [ { "role": "system", "content": "你是一个能够理解中文指令并帮助完成任务的智能助手。" }, { "role": "user", "content": "写一个简单的 Python 函数,用于计算两个数的和" } ], "stream": false, // 新增 "context": ["引用1","引用2"] } */ try { ObjectMapper objectMapper = new ObjectMapper(); HttpClient httpClient = HttpClient.newBuilder() .connectTimeout(Duration.ofSeconds(30)) .build(); DSRequest body = new DSRequest(); body.setModel(model); body.setMessages(messages); body.setStream(true); String requestBody = objectMapper.writeValueAsString(body); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(DOMAIN + "/api/chat")) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(requestBody, StandardCharsets.UTF_8)) .build(); long apiDuration = System.currentTimeMillis() - allStartTime; // 接口耗时 System.out.println("API 调用耗时: " + apiDuration + " 毫秒"); System.out.println("---- 开始流式回答: ------------------------------------"); // [SSE] 发送消息 ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatLoadingSseMessage).toJsonStr()); // 使用异步流式处理 Long finalContentDuration = contentDuration; CompletableFuture future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofLines()) .thenAccept(resp -> { resp.body().forEach(line -> { if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) { System.out.println("中止!"); redisUtil.delete(requestOfRedisKey); return; } JSONObject resJson = JSONObject.parseObject(line); if (resJson == null) return; String errJsonMessage = resJson.getString("error"); if (errJsonMessage != null) { ChatSseMessage errMsg = new ChatSseMessage("REPLY", errJsonMessage, finalContentDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, errMsg).toJsonStr()); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, new ChatSseMessage("REPLY", "[DONE][REPLY]", finalContentDuration, history_code)).toJsonStr()); chatResult.setContent(errJsonMessage); return; } JSONObject resJsonMessage = resJson.getJSONObject("message"); String content = resJsonMessage.getString("content"); // 思考标记 if (content.contains("")) { isThinking.set(true); } if (content.contains("")) { isThinking.set(false); long thinkDuration = System.currentTimeMillis() - allStartTime; if (allThinkContent.length() > 0) { ChatSseMessage thinkMsg = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, thinkMsg).toJsonStr()); } } if (isThinking.get() && !content.contains("") && !content.equals("\n\n") && !content.equals("\n")) { ChatSseMessage thinkMsg = new ChatSseMessage("THINK", content, null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, thinkMsg).toJsonStr()); allThinkContent.append(content); } if (!isThinking.get() && !content.contains("") && !content.equals("\n\n")) { Boolean done = resJson.getBoolean("done"); if (!done) { ChatSseMessage replyMsg = new ChatSseMessage("REPLY", content, null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, replyMsg).toJsonStr()); allReplyContent.append(content); } } }); }); future.join(); // 阻塞直至流结束 System.out.println("-------------------- 结束流式回答. --------------------"); contentDuration = System.currentTimeMillis() - allStartTime; System.out.println("全部推理: " + allThinkContent); System.out.println("全部回答: " + allReplyContent); System.out.println("总输出耗时: " + contentDuration + " 毫秒"); System.out.println("---------------------------------------------------"); if (StrUtil.isNotEmpty(allThinkContent.toString())) { chatResult.setReasoning_content(allThinkContent.toString()); chatResult.setReasoning_duration(System.currentTimeMillis() - allStartTime); } chatResult.setContent(allReplyContent.toString()); chatResult.setContent_duration(contentDuration); return chatResult; } catch (Exception e) { System.out.println("Exception(2): " + e.getMessage()); // [SSE] 发送消息 String contentType = (isThinking.get() ? "THINK_ABORT" : "REPLY_ABORT"); ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), contentDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSseMessage).toJsonStr()); redisUtil.delete(requestOfRedisKey); chatResult.setContent(e.getMessage()); return chatResult; } finally { // [SSE] 发送消息 (完成) ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSseMessage).toJsonStr()); } } }