123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- package com.backendsys.modules.sdk.deepseek.utils;
- import cn.hutool.core.util.ObjectUtil;
- import cn.hutool.core.util.StrUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.backendsys.modules.ai.chat.entity.Chat;
- import com.backendsys.modules.ai.chat.entity.ChatCompletionParam;
- import com.backendsys.modules.ai.chat.entity.ChatResult;
- import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
- import com.backendsys.modules.common.config.redis.utils.RedisUtil;
- import com.backendsys.modules.sdk.bocha.entity.BochaParam;
- import com.backendsys.modules.sdk.bocha.service.BochaService;
- import com.backendsys.modules.sdk.deepseek.entity.DSRequest;
- import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage;
- import com.backendsys.modules.sse.entity.SseResponse;
- import com.backendsys.modules.sse.entity.SseResponseEnum;
- import com.backendsys.modules.sse.utils.SseUtil;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- //import org.apache.http.client.methods.CloseableHttpResponse;
- //import org.apache.http.impl.client.CloseableHttpClient;
- //import org.apache.http.impl.client.HttpClients;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import java.net.URI;
- import java.net.http.HttpClient;
- import java.net.http.HttpRequest;
- import java.net.http.HttpResponse;
- import java.nio.charset.StandardCharsets;
- import java.time.Duration;
- import java.util.*;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.atomic.AtomicReference;
- /**
- * ollama run deepseek-r1:1.5b
- *
- * Ollama API
- * https://github.com/ollama/ollama/blob/main/docs/api.md
- */
- @Component
- public class OllamaUtil {
- @Autowired
- private SseUtil sseUtil;
- @Autowired
- private RedisUtil redisUtil;
- @Autowired
- private BochaService bochaService;
- @Value("${spring.application.name}")
- private String APPLICATION_NAME;
- @Value("${deepseek-r1.domain}")
- private String DOMAIN;
- /**
- * 流式对话
- */
- public ChatResult chatCompletion(ChatCompletionParam chatCompletionParam) {
- // 参数化
- Long user_id = chatCompletionParam.getUser_id();
- String model = chatCompletionParam.getModel();
- String prompt = chatCompletionParam.getPrompt();
- String history_code = chatCompletionParam.getHistory_code();
- List<Chat> chatList = chatCompletionParam.getChatList();
- Boolean internet = chatCompletionParam.getInternet();
- // 定义作用于全局的变量
- Long contentDuration = 0L;
- AtomicReference<Boolean> isThinking = new AtomicReference<>(false);
- StringBuilder allReplyContent = new StringBuilder();
- StringBuilder allThinkContent = new StringBuilder();
- String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
- ChatResult chatResult = new ChatResult();
- // try {
- System.out.println("向模型: " + model + " 提问: " + prompt);
- // 记录请求开始时间
- long allStartTime = System.currentTimeMillis();
- // 加入上下文历史对话
- System.out.println("---- 历史对话 (history_code): " + history_code + " ----");
- List<DSRequestMessage> messages = new ArrayList<>();
- if (chatList != null && !chatList.isEmpty()) {
- chatList.stream().forEach(chat -> {
- if (!"THINK".equals(chat.getContent_type())) {
- messages.add(new DSRequestMessage(chat.getRole(), chat.getContent()));
- }
- });
- // 反转列表
- Collections.reverse(messages);
- }
- // 【要把搜索到的内容塞到 'user' 里?】
- // -- [博查] Web Search API ----------------------------------------------
- if (internet) {
- // 远程查询、统计接口时间、设置返回参数
- long internetStartTime = System.currentTimeMillis();
- JsonNode searchResult = bochaService.WebSearch(new BochaParam(prompt));
- String context = bochaService.WebSearchToString(searchResult);
- context = context.replace("\n", "\\n");
- long internetEndTime = System.currentTimeMillis();
- chatResult.setInternet_duration(internetStartTime - internetEndTime);
- chatResult.setInternet_content(context);
- // 将搜索结果作为上下文添加到消息中
- messages.add(new DSRequestMessage("system", context));
- messages.add(new DSRequestMessage("user", "在回答时引用以上全部数据进行分析")); // 的 "name"、"summary"
- messages.add(new DSRequestMessage("assistant", "好的"));
- // [SSE] 发送消息
- ChatSseMessage chatSearchSseMessage = new ChatSseMessage("SEARCH", context, null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSearchSseMessage).toJsonStr());
- }
- // -----------------------------------------------------------------------
- // 新的对话内容
- messages.add(new DSRequestMessage("user", prompt));
- // 输出全部对话内容
- messages.stream().forEach(msg -> {
- System.out.println("[" + msg.getRole() + "]: " + msg.getContent());
- });
- System.out.println("---------------------------------------------------------------------");
- /*
- 【/api/generate】
- 它是一个相对基础的文本生成端点,主要用于根据给定的提示信息生成一段连续的文本。
- 这个端点会基于输入的提示,按照模型的语言生成能力输出一段完整的内容,更侧重于单纯的文本生成任务。
- 生成过程不依赖于上下文的历史对话信息,每次请求都是独立的,模型仅依据当前输入的提示进行文本生成。
- {
- "model": "llama2", "prompt": "请描述一下美丽的海滩", "num_predict": 200, "temperature": 0.7
- }
- 【/api/chat】
- 该端点专为模拟聊天场景设计,具备处理对话上下文的能力。它可以跟踪对话的历史记录,理解对话的上下文信息,从而生成更符合对话逻辑和连贯性的回复。
- 更注重模拟真实的人机对话交互,能够根据历史对话和当前输入生成合适的回应,适用于构建聊天机器人等交互式应用。
- {
- "model": "deepseek-r1:1.5b",
- "messages": [
- { "role": "system", "content": "你是一个能够理解中文指令并帮助完成任务的智能助手。" },
- { "role": "user", "content": "写一个简单的 Python 函数,用于计算两个数的和" }
- ],
- "stream": false,
- // 新增
- "context": ["引用1","引用2"]
- }
- */
- try {
- ObjectMapper objectMapper = new ObjectMapper();
- HttpClient httpClient = HttpClient.newBuilder()
- .connectTimeout(Duration.ofSeconds(30))
- .build();
- DSRequest body = new DSRequest();
- body.setModel(model);
- body.setMessages(messages);
- body.setStream(true);
- String requestBody = objectMapper.writeValueAsString(body);
- HttpRequest request = HttpRequest.newBuilder()
- .uri(URI.create(DOMAIN + "/api/chat"))
- .header("Content-Type", "application/json")
- .POST(HttpRequest.BodyPublishers.ofString(requestBody, StandardCharsets.UTF_8))
- .build();
- long apiDuration = System.currentTimeMillis() - allStartTime; // 接口耗时
- System.out.println("API 调用耗时: " + apiDuration + " 毫秒");
- System.out.println("---- 开始流式回答: ------------------------------------");
- // [SSE] 发送消息
- ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatLoadingSseMessage).toJsonStr());
- // 使用异步流式处理
- Long finalContentDuration = contentDuration;
- CompletableFuture<Void> future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
- .thenAccept(resp -> {
- resp.body().forEach(line -> {
- if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
- System.out.println("中止!");
- redisUtil.delete(requestOfRedisKey);
- return;
- }
- JSONObject resJson = JSONObject.parseObject(line);
- if (resJson == null) return;
- String errJsonMessage = resJson.getString("error");
- if (errJsonMessage != null) {
- ChatSseMessage errMsg = new ChatSseMessage("REPLY", errJsonMessage, finalContentDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, errMsg).toJsonStr());
- sseUtil.send(user_id,
- new SseResponse(SseResponseEnum.OLLAMA,
- new ChatSseMessage("REPLY", "[DONE][REPLY]", finalContentDuration, history_code)).toJsonStr());
- chatResult.setContent(errJsonMessage);
- return;
- }
- JSONObject resJsonMessage = resJson.getJSONObject("message");
- String content = resJsonMessage.getString("content");
- // 思考标记
- if (content.contains("<think>")) {
- isThinking.set(true);
- }
- if (content.contains("</think>")) {
- isThinking.set(false);
- long thinkDuration = System.currentTimeMillis() - allStartTime;
- if (allThinkContent.length() > 0) {
- ChatSseMessage thinkMsg = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, thinkMsg).toJsonStr());
- }
- }
- if (isThinking.get() && !content.contains("<think>") && !content.equals("\n\n") && !content.equals("\n")) {
- ChatSseMessage thinkMsg = new ChatSseMessage("THINK", content, null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, thinkMsg).toJsonStr());
- allThinkContent.append(content);
- }
- if (!isThinking.get() && !content.contains("</think>") && !content.equals("\n\n")) {
- Boolean done = resJson.getBoolean("done");
- if (!done) {
- ChatSseMessage replyMsg = new ChatSseMessage("REPLY", content, null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, replyMsg).toJsonStr());
- allReplyContent.append(content);
- }
- }
- });
- });
- future.join(); // 阻塞直至流结束
- System.out.println("-------------------- 结束流式回答. --------------------");
- contentDuration = System.currentTimeMillis() - allStartTime;
- System.out.println("全部推理: " + allThinkContent);
- System.out.println("全部回答: " + allReplyContent);
- System.out.println("总输出耗时: " + contentDuration + " 毫秒");
- System.out.println("---------------------------------------------------");
- if (StrUtil.isNotEmpty(allThinkContent.toString())) {
- chatResult.setReasoning_content(allThinkContent.toString());
- chatResult.setReasoning_duration(System.currentTimeMillis() - allStartTime);
- }
- chatResult.setContent(allReplyContent.toString());
- chatResult.setContent_duration(contentDuration);
- return chatResult;
- } catch (Exception e) {
- System.out.println("Exception(2): " + e.getMessage());
- // [SSE] 发送消息
- String contentType = (isThinking.get() ? "THINK_ABORT" : "REPLY_ABORT");
- ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), contentDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSseMessage).toJsonStr());
- redisUtil.delete(requestOfRedisKey);
- chatResult.setContent(e.getMessage());
- return chatResult;
- } finally {
- // [SSE] 发送消息 (完成)
- ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.OLLAMA, chatSseMessage).toJsonStr());
- }
- }
- }
|