Browse Source

comfyui + rabbitmq 跑通了(多用户、多队列未测)(代码未优化)

tsurumure 1 tháng trước cách đây
mục cha
commit
eb7c9bff71

+ 2 - 1
src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiRabbitListener.java

@@ -66,7 +66,8 @@ public class ComfyuiRabbitListener {
             System.out.println("[Comfyui][发起任务] response = " + response);
 
             // 2. 暂存 deliveryTag
-            deliveryTagHolder.put(client_id, deliveryTag);
+            System.out.println("put client_id = " + client_id);
+            deliveryTagHolder.put(client_id, deliveryTag, channel);
 
             /*
                 外部调用ACK (确认消息):

+ 7 - 6
src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/DeliveryTagHolder.java

@@ -1,5 +1,6 @@
 package com.backendsys.modules.sdk.comfyui.rabbitmq;
 
+import com.rabbitmq.client.Channel;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
@@ -7,14 +8,14 @@ import java.util.concurrent.ConcurrentHashMap;
 
 @Component
 public class DeliveryTagHolder {
-    // key = userId(或 sessionId),value = deliveryTag
-    private final Map<String, Long> map = new ConcurrentHashMap<>();
+    private final Map<String, TagCtx> map = new ConcurrentHashMap<>();
 
-    public void put(String key, Long tag) {
-        map.put(key, tag);
+    public void put(String key, long tag, Channel ch) {
+        map.put(key, new TagCtx(tag, ch));
     }
 
-    public Long take(String key) {
-        return map.remove(key);   // 拿出来后自动删除
+    public TagCtx take(String key) {
+        return map.remove(key);
     }
 }
+

+ 10 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/TagCtx.java

@@ -0,0 +1,10 @@
+package com.backendsys.modules.sdk.comfyui.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import lombok.Data;
+
+@Data
+public class TagCtx {
+    private final long deliveryTag;
+    private final Channel channel;
+}

+ 10 - 8
src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiSocketServiceImpl.java

@@ -13,6 +13,7 @@ import com.backendsys.modules.sdk.comfyui.enums.TaskStatusEnums;
 import com.backendsys.modules.sdk.comfyui.rabbitmq.ComfyuiQueueConfig;
 import com.backendsys.modules.sdk.comfyui.rabbitmq.ComfyuiRabbitListener;
 import com.backendsys.modules.sdk.comfyui.rabbitmq.DeliveryTagHolder;
+import com.backendsys.modules.sdk.comfyui.rabbitmq.TagCtx;
 import com.backendsys.modules.sdk.comfyui.service.ComfyuiSocketService;
 import com.backendsys.modules.sdk.comfyui.service.ComfyuiTaskService;
 import com.backendsys.modules.sdk.comfyui.utils.ComfyuiUtil;
@@ -201,29 +202,30 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
                                 // -- [RabbitMQ][完成当前队列任务,放行下一个队列] ----------------------------------------
                                 try {
                                     System.out.println("-- 准备放行队列 --");
-                                    Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
-                                    Channel channel = connection.createChannel(false);
+//                                    Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
+//                                    Channel channel = connection.createChannel(false);
 //                                    GetResponse resp = channel.basicGet(config.QUEUE, false);
 
                                     // 已经被 Listener 的这里拿不到?
                                     // 这里是空的?
-                                    Long tag = deliveryTagHolder.take(client_id);
+                                    TagCtx tag = deliveryTagHolder.take(client_id);
+                                    Channel channel = tag.getChannel();
 
                                     // 1) 手动确认消息 (ACK)
                                     System.out.println("-- 手动确认消息 (ACK) ----------");
 //                                    channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
-                                    channel.basicAck(tag, false);
-
+                                    System.out.println("client_id = " + client_id + ", tag = " + tag);
+                                    channel.basicAck(tag.getDeliveryTag(), false);
+                                    channel.close();
 
 
                                     // 2) 清除队列锁 (QUEUE_LOCK_KEY)
-
                                     String QUEUE_LOCK_KEY = APPLICATION_NAME + ":comfyui:queue:lock:user:" + Convert.toStr(user_id);
                                     System.out.println("-- 清除队列锁 (QUEUE_LOCK_KEY): " + QUEUE_LOCK_KEY);
                                     redisUtil.delete(QUEUE_LOCK_KEY);
 
-                                    channel.close();
-                                    connection.close();
+
+
                                 } catch (Exception e) {
                                     System.out.println(e.getMessage());
                                 }