|
@@ -1,20 +1,29 @@
|
|
|
package com.backendsys.modules.sdk.comfyui.service.impl;
|
|
|
|
|
|
import cn.hutool.core.convert.Convert;
|
|
|
+import cn.hutool.core.map.MapUtil;
|
|
|
import cn.hutool.json.JSONArray;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
+import com.backendsys.modules.common.config.redis.utils.RedisUtil;
|
|
|
import com.backendsys.modules.common.config.security.utils.SecurityUtil;
|
|
|
import com.backendsys.modules.crt.dao.CrtGenerateImageDao;
|
|
|
import com.backendsys.modules.crt.entity.CrtGenerateImage;
|
|
|
import com.backendsys.modules.sdk.comfyui.enums.TaskStatusEnums;
|
|
|
+import com.backendsys.modules.sdk.comfyui.rabbitmq.ComfyuiQueueConfig;
|
|
|
+import com.backendsys.modules.sdk.comfyui.rabbitmq.ComfyuiRabbitListener;
|
|
|
+import com.backendsys.modules.sdk.comfyui.rabbitmq.DeliveryTagHolder;
|
|
|
import com.backendsys.modules.sdk.comfyui.service.ComfyuiSocketService;
|
|
|
+import com.backendsys.modules.sdk.comfyui.service.ComfyuiTaskService;
|
|
|
import com.backendsys.modules.sdk.comfyui.utils.ComfyuiUtil;
|
|
|
import com.backendsys.modules.sse.entity.SseResponse;
|
|
|
import com.backendsys.modules.sse.entity.SseResponseEnum;
|
|
|
import com.backendsys.modules.sse.utils.SseUtil;
|
|
|
import com.backendsys.modules.upload.entity.SysFileResult;
|
|
|
import com.backendsys.modules.upload.service.SysFileService;
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
+import com.rabbitmq.client.GetResponse;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -29,24 +38,42 @@ import reactor.netty.http.client.HttpClient;
|
|
|
import java.net.URI;
|
|
|
import java.time.Duration;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
+import org.springframework.amqp.rabbit.connection.Connection;
|
|
|
+
|
|
|
@Service
|
|
|
public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
|
|
|
|
|
|
@Autowired
|
|
|
private SseUtil sseUtil;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private RedisUtil redisUtil;
|
|
|
+ @Autowired
|
|
|
+ private RabbitTemplate rabbitTemplate;
|
|
|
+
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ComfyuiQueueConfig config;
|
|
|
@Autowired
|
|
|
private ComfyuiUtil comfyuiUtil;
|
|
|
@Autowired
|
|
|
+ private ComfyuiTaskService comfyuiTaskService;
|
|
|
+ @Autowired
|
|
|
private SysFileService sysFileService;
|
|
|
@Autowired
|
|
|
private CrtGenerateImageDao crtGenerateImageDao;
|
|
|
+ @Autowired
|
|
|
+ private DeliveryTagHolder deliveryTagHolder;
|
|
|
+
|
|
|
|
|
|
+ @Value("${spring.application.name}")
|
|
|
+ private String APPLICATION_NAME;
|
|
|
@Value("${comfyui.host}")
|
|
|
private String COMFYUI_HOST;
|
|
|
@Value("${comfyui.token}")
|
|
@@ -121,9 +148,14 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
|
|
|
}
|
|
|
@Override
|
|
|
public Mono<Void> connectToSse(String client_id, Integer port, Boolean is_save, Map<String, Object> params) {
|
|
|
+ return connectToSse(client_id, port, is_save, params, null);
|
|
|
+ }
|
|
|
+ // 如果是在 websocket 周期,则需要手动输入 user_id
|
|
|
+ @Override
|
|
|
+ public Mono<Void> connectToSse(String client_id, Integer port, Boolean is_save, Map<String, Object> params, Long input_user_id) {
|
|
|
|
|
|
// 由于 Websocket 获取不到上下文信息,所以 user_id 不能在 socket 周期获取
|
|
|
- Long user_id = SecurityUtil.getUserId();
|
|
|
+ Long user_id = input_user_id != null ? input_user_id : SecurityUtil.getUserId();
|
|
|
|
|
|
String wsUrl = "ws://" + COMFYUI_HOST + ":" + port + "/ws";
|
|
|
return Mono.defer(() -> {
|
|
@@ -132,12 +164,11 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
|
|
|
}
|
|
|
// 动态创建带有认证头的客户端
|
|
|
WebSocketClient clientWithAuth = createWebSocketClientWithToken(COMFYUI_TOKEN);
|
|
|
- return clientWithAuth.execute(URI.create(wsUrl + "?clientId=" + client_id), session -> {
|
|
|
+ String wsUrlFull = wsUrl + "?clientId=" + client_id + "&userId=" + user_id;
|
|
|
+ System.out.println("Listen: " + wsUrlFull);
|
|
|
+ return clientWithAuth.execute(URI.create(wsUrlFull), session -> {
|
|
|
// 保存会话
|
|
|
sessions.put(client_id, session);
|
|
|
- // 接收消息
|
|
|
- System.out.println("------ wsUrl: " + wsUrl + " ------");
|
|
|
- System.out.println("------ connectToSse client_id: " + client_id + ", user_id: " + user_id + " ------");
|
|
|
|
|
|
Flux<String> incomingMessages = session.receive()
|
|
|
.map(WebSocketMessage::getPayloadAsText)
|
|
@@ -151,21 +182,82 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
|
|
|
dataChildren.put("drama_project_storyboard_id", drama_project_storyboard_id);
|
|
|
data.put("data", dataChildren);
|
|
|
|
|
|
- // == [任务执行完成] =======================================================
|
|
|
+ // == [任务执行完成] =======================================================================
|
|
|
// { "type": "executed", .. }
|
|
|
String type = Convert.toStr(data.get("type"));
|
|
|
if (TaskStatusEnums.EXECUTED.getValue().equals(type)) {
|
|
|
|
|
|
+ // 在任务结束后(SSE)(ComfyuiSocketService.connectToSse):
|
|
|
+ // 1) 手动确认消息(ACK)
|
|
|
+ // 2) 清除队列锁 (QUEUE_LOCK_KEY)
|
|
|
+ // 3) 尝试把下一条任务推进主队列
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // -- [RabbitMQ][完成当前队列任务,放行下一个队列] ----------------------------------------
|
|
|
+ try {
|
|
|
+ System.out.println("-- 准备放行队列 --");
|
|
|
+ Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
|
|
|
+ Channel channel = connection.createChannel(false);
|
|
|
+// GetResponse resp = channel.basicGet(config.QUEUE, false);
|
|
|
+
|
|
|
+ // 已经被 Listener 的这里拿不到?
|
|
|
+ // 这里是空的?
|
|
|
+ Long tag = deliveryTagHolder.take(client_id);
|
|
|
+
|
|
|
+ // 1) 手动确认消息 (ACK)
|
|
|
+ System.out.println("-- 手动确认消息 (ACK) ----------");
|
|
|
+// channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
|
|
|
+ channel.basicAck(tag, false);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 2) 清除队列锁 (QUEUE_LOCK_KEY)
|
|
|
+
|
|
|
+ String QUEUE_LOCK_KEY = APPLICATION_NAME + ":comfyui:queue:lock:user:" + Convert.toStr(user_id);
|
|
|
+ System.out.println("-- 清除队列锁 (QUEUE_LOCK_KEY): " + QUEUE_LOCK_KEY);
|
|
|
+ redisUtil.delete(QUEUE_LOCK_KEY);
|
|
|
+
|
|
|
+ channel.close();
|
|
|
+ connection.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println(e.getMessage());
|
|
|
+ }
|
|
|
+ // ----------------------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
JSONObject output = JSONUtil.parseObj(dataChildren.get("output"));
|
|
|
|
|
|
- // -- [生成图片 -> 转存图片 -> 新增记录] ------------------------------------------------------
|
|
|
+ // -- [生成图片 -> 转存图片 -> 新增记录] ------------------------------------------------
|
|
|
// 由于图片地址不是公开的,需要加 Token 访问,因此不能公开返回原始图片地址,比如:
|
|
|
// http://43.128.1.201:8000/api/view?filename=fenjing_1_00012_.png&token=$2b$12$.MR4qGaFetN1FPQzbfyIrehsyjnPJ12xAZhR/l7KZpLkUPQTCG4gy
|
|
|
// 因此,直接在 ComfyUI 的物理目录上构建 nginx 静态资源访问目录
|
|
|
Object imagesObj = output.get("images");
|
|
|
if (imagesObj != null) {
|
|
|
- String prompt_id = Convert.toStr(dataChildren.get("prompt_id"));
|
|
|
|
|
|
+ // 3) 尝试把下一条任务推进主队列 (生图) ----------------------------------------------
|
|
|
+ System.out.println("-- 尝试把下一条任务推进主队列 (生图) --");
|
|
|
+ Map<String, Object> map = new HashMap<>();
|
|
|
+ map.put("drama_project_storyboard_id", drama_project_storyboard_id);
|
|
|
+ map.put("user_id", user_id);
|
|
|
+ String custom_params = JSONUtil.toJsonStr(map);
|
|
|
+
|
|
|
+ System.out.println("custom_params = " + custom_params);
|
|
|
+
|
|
|
+ comfyuiTaskService.tryPushNext(user_id, custom_params);
|
|
|
+ // -----------------------------------------------------------------------------
|
|
|
+
|
|
|
+ String prompt_id = Convert.toStr(dataChildren.get("prompt_id"));
|
|
|
|
|
|
// 临时图片相对路径
|
|
|
JSONArray images = JSONUtil.parseArray(imagesObj);
|
|
@@ -186,9 +278,9 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
|
|
|
dataChildren.put("output", output);
|
|
|
data.put("data", dataChildren);
|
|
|
|
|
|
- // [DB] 执行任务
|
|
|
+ // [DB] 更新任务状态
|
|
|
CompletableFuture.runAsync(() -> {
|
|
|
- comfyuiUtil.executeComfyuiTask(prompt_id, JSONUtil.toJsonStr(dataChildren), 2);
|
|
|
+ comfyuiUtil.updateComfyuiTask(prompt_id, JSONUtil.toJsonStr(dataChildren), 2);
|
|
|
});
|
|
|
|
|
|
}
|