123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- package com.backendsys.modules.sdk.deepseek.utils;
- import cn.hutool.core.convert.Convert;
- import cn.hutool.core.util.NumberUtil;
- import cn.hutool.core.util.ObjectUtil;
- import cn.hutool.core.util.StrUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.backendsys.modules.ai.chat.entity.Chat;
- import com.backendsys.modules.ai.chat.entity.ChatResult;
- import com.backendsys.modules.ai.chat.entity.ChatSseMessage;
- import com.backendsys.modules.common.config.redis.utils.RedisUtil;
- import com.backendsys.modules.sdk.deepseek.entity.DSRequest;
- import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage;
- import com.backendsys.modules.sse.entity.SseResponse;
- import com.backendsys.modules.sse.entity.SseResponseEnum;
- import com.backendsys.modules.sse.utils.SseUtil;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.apache.http.client.methods.CloseableHttpResponse;
- import org.apache.http.client.methods.HttpPost;
- import org.apache.http.entity.StringEntity;
- import org.apache.http.impl.client.CloseableHttpClient;
- import org.apache.http.impl.client.HttpClients;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
- import java.nio.charset.StandardCharsets;
- import java.util.*;
- import java.util.concurrent.CompletableFuture;
- /**
- * ollama run deepseek-r1:1.5b
- *
- * Ollama API
- * https://github.com/ollama/ollama/blob/main/docs/api.md
- */
- @Component
- public class OllamaUtil {
- @Autowired
- private SseUtil sseUtil;
- @Autowired
- private RedisUtil redisUtil;
- @Value("${spring.application.name}")
- private String APPLICATION_NAME;
- @Value("${deepseek-r1.domain}")
- private String DOMAIN;
- /**
- * 流式对话
- */
- public ChatResult chatCompletion(Long user_id, String model, String prompt, String history_code, List<Chat> chatList) {
- Long contentDuration = 0L;
- ChatResult chatResult = new ChatResult();
- // try {
- System.out.println("向模型: " + model + " 提问: " + prompt);
- // 记录请求开始时间
- long allStartTime = System.currentTimeMillis();
- // 加入上下文历史对话
- System.out.println("---- 历史对话 (history_code): " + history_code + " ----");
- List<DSRequestMessage> messages = new ArrayList<>();
- if (chatList != null && !chatList.isEmpty()) {
- chatList.stream().forEach(chat -> {
- if (!"THINK".equals(chat.getContent_type())) {
- messages.add(new DSRequestMessage(chat.getRole(), chat.getContent()));
- }
- });
- // 反转列表
- Collections.reverse(messages);
- }
- // 新的对话内容
- messages.add(new DSRequestMessage("user", prompt));
- // 输出全部对话内容
- messages.stream().forEach(msg -> {
- System.out.println("[" + msg.getRole() + "]: " + msg.getContent());
- });
- System.out.println("---------------------------------------------------------------------");
- // 定义作用于全局的变量
- Boolean isThinking = false;
- StringBuilder allReplyContent = new StringBuilder();
- StringBuilder allThinkContent = new StringBuilder();
- String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code;
- ObjectMapper objectMapper = new ObjectMapper();
- try (CloseableHttpClient client = HttpClients.createDefault()) {
- /*
- 【/api/generate】
- 它是一个相对基础的文本生成端点,主要用于根据给定的提示信息生成一段连续的文本。
- 这个端点会基于输入的提示,按照模型的语言生成能力输出一段完整的内容,更侧重于单纯的文本生成任务。
- 生成过程不依赖于上下文的历史对话信息,每次请求都是独立的,模型仅依据当前输入的提示进行文本生成。
- {
- "model": "llama2", "prompt": "请描述一下美丽的海滩", "num_predict": 200, "temperature": 0.7
- }
- 【/api/chat】
- 该端点专为模拟聊天场景设计,具备处理对话上下文的能力。它可以跟踪对话的历史记录,理解对话的上下文信息,从而生成更符合对话逻辑和连贯性的回复。
- 更注重模拟真实的人机对话交互,能够根据历史对话和当前输入生成合适的回应,适用于构建聊天机器人等交互式应用。
- {
- "model": "deepseek-r1:1.5b",
- "messages": [
- {
- "role": "system",
- "content": "你是一个能够理解中文指令并帮助完成任务的智能助手。你的任务是根据用户的需求生成合适的分类任务或生成任务,并准确判断这些任务的类型。请确保你的回答简洁、准确且符合中英文语境。"
- },
- {
- "role": "user",
- "content": "写一个简单的 Python 函数,用于计算两个数的和"
- }
- ],
- "stream": false
- }
- */
- // [Chat] 构建请求体
- HttpPost request = new HttpPost(DOMAIN + "/api/chat");
- DSRequest body = new DSRequest();
- body.setModel(model);
- body.setMessages(messages);
- body.setStream(true);
- String requestBody = objectMapper.writeValueAsString(body);
- // // [Generate] 构建请求体
- // HttpPost request = new HttpPost(DOMAIN + "/api/generate");
- // Map<String, Object> requestMap = new HashMap<>();
- // requestMap.put("model", model);
- // requestMap.put("prompt", prompt);
- // requestMap.put("stream", true);
- // String requestBody = objectMapper.writeValueAsString(requestMap);
- request.setEntity(new StringEntity(requestBody, StandardCharsets.UTF_8));
- try (CloseableHttpResponse response = client.execute(request);
- BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
- long apiDuration = System.currentTimeMillis() - allStartTime; // 接口耗时
- long thinkStartTime = 0L; // 开始思考时间
- long thinkDuration = 0L; // 思考耗时
- System.out.println("API 调用耗时: " + apiDuration + " 毫秒");
- System.out.println("---- 开始流式回答: ------------------------------------");
- // [SSE] 发送消息
- ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatLoadingSseMessage).toJsonStr());
- String line;
- while ((line = reader.readLine()) != null) {
- // 判断是否中止
- if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) {
- System.out.println("中止!");
- request.abort();
- // 流程结束后,删除锁
- redisUtil.delete(requestOfRedisKey);
- break;
- }
- // System.out.println(line);
- /*
- ---------------------- [Chat] line ----------------------
- {"model":"deepseek-r1:1.5b","created_at":"2025-03-18T07:37:06.483163789Z","message":{"role":"assistant","content":"\u003cthink\u003e"},"done":false}
- ---------------------- [Generate] line ------------------
- {"model":"deepseek-r1:1.5b","created_at":"2025-03-05T10:51:17.443189986Z","response":"\u003cthink\u003e","done":false}
- {"model":"deepseek-r1:1.5b","created_at":"2025-03-06T11:08:30.9219611Z","response":"\n\n","done":false}
- ---------------------- [Error] line ---------------------
- {"error":"llama runner process has terminated: error loading model: unable to allocate CUDA0 buffer"}
- */
- // 每行数据可以是一个JSON对象,根据实际情况处理
- JSONObject resJson = JSONObject.parseObject(line);
- // -- 判断内容是否为空 (或报错) --------------------------------------
- if (resJson == null) return null;
- String errJsonMessage = resJson.getString("error");
- if (errJsonMessage != null) {
- // [SSE] 发送消息
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, new ChatSseMessage("REPLY", errJsonMessage, contentDuration, history_code)).toJsonStr());
- // [SSE] 发送消息 (完成)
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code)).toJsonStr());
- //
- chatResult.setContent(errJsonMessage);
- return chatResult;
- }
- // --------------------------------------------------------------
- // [Chat]
- JSONObject resJsonMessage = resJson.getJSONObject("message");
- String content = resJsonMessage.getString("content");
- // // [Generate]
- // String content = resJson.getString("response");
- // --------------------------------------------------------------
- // System.out.println("content: " + content);
- // content: \n\n
- // content: <think>
- // content: </think>
- // 开始思考
- if (content.contains("<think>")) {
- isThinking = true;
- thinkStartTime = System.currentTimeMillis();
- }
- // 停止思考,并计算思考耗时
- if (content.contains("</think>")) {
- isThinking = false;
- thinkDuration = thinkStartTime - allStartTime;
- System.out.println("推理耗时: " + thinkDuration + "毫秒");
- System.out.println("-----------------------------------------------------");
- if (allThinkContent.length() > 0){
- // [SSE] 发送消息
- ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- }
- }
- // [思考] Think
- if (isThinking) {
- if (!content.contains("<think>") && !content.contains("\n\n") && !content.contains("\n")) {
- // [SSE] 发送消息
- ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", content, null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- // 收集推理内容
- allThinkContent.append(content);
- }
- }
- // [回答] Reply
- if (!isThinking) {
- // System.out.println("content: " + content);
- if (!content.contains("</think>") && !content.contains("\n\n")) {
- Boolean done = resJson.getBoolean("done");
- if (!done) {
- // [SSE] 发送消息
- ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content, null, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- // 收集回答内容
- allReplyContent.append(content);
- }
- }
- }
- }
- System.out.println("-------------------- 结束流式回答. --------------------");
- contentDuration = System.currentTimeMillis() - allStartTime;
- System.out.println("全部推理: " + allThinkContent);
- System.out.println("全部回答: " + allReplyContent);
- System.out.println("总输出耗时: " + contentDuration + " 毫秒");
- System.out.println("-----");
- // System.out.println("Think content: " + allThinkContent.toString());
- // System.out.println("Think content length: " + allThinkContent.toString().length());
- // System.out.println("Think content is not empty: " + StrUtil.isNotEmpty(allThinkContent.toString()));
- // System.out.println("-----");
- if (StrUtil.isNotEmpty(allThinkContent.toString())) {
- chatResult.setReasoning_content(allThinkContent.toString());
- chatResult.setReasoning_duration(thinkDuration);
- }
- chatResult.setContent(allReplyContent.toString());
- chatResult.setContent_duration(contentDuration);
- // [SSE] 发送消息 (完成)
- ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- return chatResult;
- } catch (Exception e) {
- System.out.println("Exception(1): " + e.getMessage());
- String message = e.getMessage();
- if (message.contains("failed to respond")) {
- message = "(系统繁忙,请稍后再试)";
- }
- if (message.contains("Premature end of chunk coded message body: closing chunk expected")) {
- message = "(请求中止)";
- }
- // [SSE] 发送消息
- String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT");
- ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, message, contentDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- // chatResult.setContent(e.getMessage());
- // 由于中止导致的错误信息叠加 (一并保存进数据库)
- if (StrUtil.isNotEmpty(allThinkContent.toString())) {
- if (isThinking) {
- chatResult.setReasoning_content(allThinkContent.toString() + " " + message);
- } else {
- chatResult.setReasoning_content(allThinkContent.toString());
- }
- }
- chatResult.setContent(allReplyContent.toString() + " " + message);
- redisUtil.delete(requestOfRedisKey);
- return chatResult;
- // return chatResult;
- }
- } catch (Exception e) {
- System.out.println("Exception(2): " + e.getMessage());
- // [SSE] 发送消息
- String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT");
- ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), contentDuration, history_code);
- sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- redisUtil.delete(requestOfRedisKey);
- chatResult.setContent(e.getMessage());
- return chatResult;
- }
- // } catch (Exception e) {
- // System.out.println("Exception(3): " + e.getMessage());
- // // [SSE] 发送消息
- // ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), contentDuration);
- // sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- //
- // chatResult.setContent(e.getMessage());
- // return chatResult;
- // }
- // } finally {
- // System.out.println("Finally.");
- // // [SSE] 发送消息
- // ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration);
- // sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr());
- // }
- }
- }
|