package com.backendsys.modules.sdk.deepseek.utils; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.backendsys.modules.ai.chat.entity.Chat; import com.backendsys.modules.ai.chat.entity.ChatResult; import com.backendsys.modules.ai.chat.entity.ChatSseMessage; import com.backendsys.modules.common.config.redis.utils.RedisUtil; import com.backendsys.modules.sdk.deepseek.entity.DSRequest; import com.backendsys.modules.sdk.deepseek.entity.DSRequestMessage; import com.backendsys.modules.sse.entity.SseResponse; import com.backendsys.modules.sse.entity.SseResponseEnum; import com.backendsys.modules.sse.utils.SseUtil; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; /** * ollama run deepseek-r1:1.5b * * Ollama API * https://github.com/ollama/ollama/blob/main/docs/api.md */ @Component public class OllamaUtil { @Autowired private SseUtil sseUtil; @Autowired private RedisUtil redisUtil; @Value("${spring.application.name}") private String APPLICATION_NAME; @Value("${deepseek-r1.domain}") private String DOMAIN; /** * 流式对话 */ public ChatResult chatCompletion(Long user_id, String model, String prompt, String history_code, List chatList) { Long contentDuration = 0L; ChatResult chatResult = new ChatResult(); // try { System.out.println("向模型: " + model + " 提问: " + prompt); // 记录请求开始时间 long allStartTime = System.currentTimeMillis(); // 加入上下文历史对话 System.out.println("---- 历史对话 (history_code): " + history_code + " ----"); List messages = new ArrayList<>(); if (chatList != null && !chatList.isEmpty()) { chatList.stream().forEach(chat -> { if (!"THINK".equals(chat.getContent_type())) { messages.add(new DSRequestMessage(chat.getRole(), chat.getContent())); } }); // 反转列表 Collections.reverse(messages); } // 新的对话内容 messages.add(new DSRequestMessage("user", prompt)); // 输出全部对话内容 messages.stream().forEach(msg -> { System.out.println("[" + msg.getRole() + "]: " + msg.getContent()); }); System.out.println("---------------------------------------------------------------------"); // 定义作用于全局的变量 Boolean isThinking = false; StringBuilder allReplyContent = new StringBuilder(); StringBuilder allThinkContent = new StringBuilder(); String requestOfRedisKey = APPLICATION_NAME + "-chat-history-" + history_code; ObjectMapper objectMapper = new ObjectMapper(); try (CloseableHttpClient client = HttpClients.createDefault()) { /* 【/api/generate】 它是一个相对基础的文本生成端点,主要用于根据给定的提示信息生成一段连续的文本。 这个端点会基于输入的提示,按照模型的语言生成能力输出一段完整的内容,更侧重于单纯的文本生成任务。 生成过程不依赖于上下文的历史对话信息,每次请求都是独立的,模型仅依据当前输入的提示进行文本生成。 { "model": "llama2", "prompt": "请描述一下美丽的海滩", "num_predict": 200, "temperature": 0.7 } 【/api/chat】 该端点专为模拟聊天场景设计,具备处理对话上下文的能力。它可以跟踪对话的历史记录,理解对话的上下文信息,从而生成更符合对话逻辑和连贯性的回复。 更注重模拟真实的人机对话交互,能够根据历史对话和当前输入生成合适的回应,适用于构建聊天机器人等交互式应用。 { "model": "deepseek-r1:1.5b", "messages": [ { "role": "system", "content": "你是一个能够理解中文指令并帮助完成任务的智能助手。你的任务是根据用户的需求生成合适的分类任务或生成任务,并准确判断这些任务的类型。请确保你的回答简洁、准确且符合中英文语境。" }, { "role": "user", "content": "写一个简单的 Python 函数,用于计算两个数的和" } ], "stream": false } */ // [Chat] 构建请求体 HttpPost request = new HttpPost(DOMAIN + "/api/chat"); DSRequest body = new DSRequest(); body.setModel(model); body.setMessages(messages); body.setStream(true); String requestBody = objectMapper.writeValueAsString(body); // // [Generate] 构建请求体 // HttpPost request = new HttpPost(DOMAIN + "/api/generate"); // Map requestMap = new HashMap<>(); // requestMap.put("model", model); // requestMap.put("prompt", prompt); // requestMap.put("stream", true); // String requestBody = objectMapper.writeValueAsString(requestMap); request.setEntity(new StringEntity(requestBody, StandardCharsets.UTF_8)); try (CloseableHttpResponse response = client.execute(request); BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { long apiDuration = System.currentTimeMillis() - allStartTime; // 接口耗时 long thinkStartTime = 0L; // 开始思考时间 long thinkDuration = 0L; // 思考耗时 System.out.println("API 调用耗时: " + apiDuration + " 毫秒"); System.out.println("---- 开始流式回答: ------------------------------------"); // [SSE] 发送消息 ChatSseMessage chatLoadingSseMessage = new ChatSseMessage("LOADING", "正在思考", null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatLoadingSseMessage).toJsonStr()); String line; while ((line = reader.readLine()) != null) { // 判断是否中止 if (ObjectUtil.isEmpty(redisUtil.getCacheObject(requestOfRedisKey))) { System.out.println("中止!"); request.abort(); // 流程结束后,删除锁 redisUtil.delete(requestOfRedisKey); break; } // System.out.println(line); /* ---------------------- [Chat] line ---------------------- {"model":"deepseek-r1:1.5b","created_at":"2025-03-18T07:37:06.483163789Z","message":{"role":"assistant","content":"\u003cthink\u003e"},"done":false} ---------------------- [Generate] line ------------------ {"model":"deepseek-r1:1.5b","created_at":"2025-03-05T10:51:17.443189986Z","response":"\u003cthink\u003e","done":false} {"model":"deepseek-r1:1.5b","created_at":"2025-03-06T11:08:30.9219611Z","response":"\n\n","done":false} ---------------------- [Error] line --------------------- {"error":"llama runner process has terminated: error loading model: unable to allocate CUDA0 buffer"} */ // 每行数据可以是一个JSON对象,根据实际情况处理 JSONObject resJson = JSONObject.parseObject(line); // -- 判断内容是否为空 (或报错) -------------------------------------- if (resJson == null) return null; String errJsonMessage = resJson.getString("error"); if (errJsonMessage != null) { // [SSE] 发送消息 sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, new ChatSseMessage("REPLY", errJsonMessage, contentDuration, history_code)).toJsonStr()); // [SSE] 发送消息 (完成) sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code)).toJsonStr()); // chatResult.setContent(errJsonMessage); return chatResult; } // -------------------------------------------------------------- // [Chat] JSONObject resJsonMessage = resJson.getJSONObject("message"); String content = resJsonMessage.getString("content"); // // [Generate] // String content = resJson.getString("response"); // -------------------------------------------------------------- // System.out.println("content: " + content); // content: \n\n // content: // content: // 开始思考 if (content.contains("")) { isThinking = true; thinkStartTime = System.currentTimeMillis(); } // 停止思考,并计算思考耗时 if (content.contains("")) { isThinking = false; thinkDuration = thinkStartTime - allStartTime; System.out.println("推理耗时: " + thinkDuration + "毫秒"); System.out.println("-----------------------------------------------------"); if (allThinkContent.length() > 0){ // [SSE] 发送消息 ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", "[DONE][THINK]", thinkDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); } } // [思考] Think if (isThinking) { if (!content.contains("") && !content.contains("\n\n") && !content.contains("\n")) { // [SSE] 发送消息 ChatSseMessage chatSseMessage = new ChatSseMessage("THINK", content, null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); // 收集推理内容 allThinkContent.append(content); } } // [回答] Reply if (!isThinking) { // System.out.println("content: " + content); if (!content.contains("") && !content.contains("\n\n")) { Boolean done = resJson.getBoolean("done"); if (!done) { // [SSE] 发送消息 ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", content, null, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); // 收集回答内容 allReplyContent.append(content); } } } } System.out.println("-------------------- 结束流式回答. --------------------"); contentDuration = System.currentTimeMillis() - allStartTime; System.out.println("全部推理: " + allThinkContent); System.out.println("全部回答: " + allReplyContent); System.out.println("总输出耗时: " + contentDuration + " 毫秒"); System.out.println("-----"); // System.out.println("Think content: " + allThinkContent.toString()); // System.out.println("Think content length: " + allThinkContent.toString().length()); // System.out.println("Think content is not empty: " + StrUtil.isNotEmpty(allThinkContent.toString())); // System.out.println("-----"); if (StrUtil.isNotEmpty(allThinkContent.toString())) { chatResult.setReasoning_content(allThinkContent.toString()); chatResult.setReasoning_duration(thinkDuration); } chatResult.setContent(allReplyContent.toString()); chatResult.setContent_duration(contentDuration); // [SSE] 发送消息 (完成) ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); return chatResult; } catch (Exception e) { System.out.println("Exception(1): " + e.getMessage()); String message = e.getMessage(); if (message.contains("failed to respond")) { message = "(系统繁忙,请稍后再试)"; } if (message.contains("Premature end of chunk coded message body: closing chunk expected")) { message = "(请求中止)"; } // [SSE] 发送消息 String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT"); ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, message, contentDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); // chatResult.setContent(e.getMessage()); // 由于中止导致的错误信息叠加 (一并保存进数据库) if (StrUtil.isNotEmpty(allThinkContent.toString())) { if (isThinking) { chatResult.setReasoning_content(allThinkContent.toString() + " " + message); } else { chatResult.setReasoning_content(allThinkContent.toString()); } } chatResult.setContent(allReplyContent.toString() + " " + message); redisUtil.delete(requestOfRedisKey); return chatResult; // return chatResult; } } catch (Exception e) { System.out.println("Exception(2): " + e.getMessage()); // [SSE] 发送消息 String contentType = (isThinking ? "THINK_ABORT" : "REPLY_ABORT"); ChatSseMessage chatSseMessage = new ChatSseMessage(contentType, e.getMessage(), contentDuration, history_code); sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); redisUtil.delete(requestOfRedisKey); chatResult.setContent(e.getMessage()); return chatResult; } // } catch (Exception e) { // System.out.println("Exception(3): " + e.getMessage()); // // [SSE] 发送消息 // ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", e.getMessage(), contentDuration); // sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); // // chatResult.setContent(e.getMessage()); // return chatResult; // } // } finally { // System.out.println("Finally."); // // [SSE] 发送消息 // ChatSseMessage chatSseMessage = new ChatSseMessage("REPLY", "[DONE][REPLY]", contentDuration); // sseUtil.send(user_id, new SseResponse(SseResponseEnum.DEEPSEEK, chatSseMessage).toJsonStr()); // } } }