Ver Fonte

新增comfyui主队列

tsurumure há 1 mês atrás
pai
commit
825a516473
17 ficheiros alterados com 300 adições e 109 exclusões
  1. 3 1
      db/comfyui_task.sql
  2. 4 4
      src/main/java/com/backendsys/modules/TestController.java
  3. 7 7
      src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoRabbitListener.java
  4. 45 0
      src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoRabbitListenerRunner.java
  5. 0 41
      src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitListenerRunner.java
  6. 30 0
      src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/DemoDlxQueueConfig.java
  7. 8 8
      src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/DemoQueueConfig.java
  8. 0 30
      src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/QueueDlxConfig.java
  9. 1 0
      src/main/java/com/backendsys/modules/sdk/comfyui/entity/ComfyuiTask.java
  10. 46 0
      src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiQueueConfig.java
  11. 30 0
      src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiQueueDlxConfig.java
  12. 36 0
      src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiRabbitListener.java
  13. 8 0
      src/main/java/com/backendsys/modules/sdk/comfyui/service/ComfyuiTaskService.java
  14. 3 3
      src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiSocketServiceImpl.java
  15. 74 0
      src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiTaskServiceImpl.java
  16. 4 14
      src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiText2ImageServiceImpl.java
  17. 1 1
      src/main/java/com/backendsys/modules/sdk/comfyui/utils/ComfyuiUtil.java

+ 3 - 1
db/comfyui_task.sql

@@ -13,7 +13,9 @@ CREATE TABLE `comfyui_task` (
     `task_type` VARCHAR(255) NOT NULL COMMENT '任务类型 (Text2Image, ..)',
     `task_status` TINYINT DEFAULT '-1' COMMENT '任务状态 (-1:未提交, 1:已提交)',
     `generate_request` TEXT COMMENT '任务请求原始参数 (JSONString)',
+    `in_master` TINYINT DEFAULT '-1' COMMENT '队列状态 (-1:未进主队列, 1:已进主队列)',
     `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
     `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
-    INDEX `idx_user_id` (`user_id`)
+    INDEX `idx_user_id` (`user_id`),
+    INDEX `idx_in_master` (`in_master`)
 ) ENGINE=INNODB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='任务表';

+ 4 - 4
src/main/java/com/backendsys/modules/TestController.java

@@ -112,9 +112,9 @@ public class TestController {
 
         SysUser sysUser = new SysUser();
         sysUser.setUsername(UUID.randomUUID().toString());
-        System.out.println("【RabbitMQ-发送】:" + sysUser);
+        System.out.println("[Demo][RabbitMQ-发送]:" + sysUser);
 //        rabbitTemplate.convertAndSend("", "demo.queue", sysUser);
-        rabbitTemplate.convertAndSend("order.exchange", "order.create", sysUser);
+        rabbitTemplate.convertAndSend("demo.exchange", "demo.create", sysUser);
 
         return "ok";
     }
@@ -127,7 +127,7 @@ public class TestController {
         Channel channel = connection.createChannel(false);
 
         // 2. 手动拉消息,第二个参数传 false 表示“不要自动 ack”
-        GetResponse resp = channel.basicGet("order.queue", false);
+        GetResponse resp = channel.basicGet("demo.queue", false);
         if (resp == null) {
             channel.close();
             connection.close();
@@ -135,7 +135,7 @@ public class TestController {
         }
 
         String body = new String(resp.getBody(), StandardCharsets.UTF_8);
-        System.out.println("【RabbitMQ-手动处理】:" + body);
+        System.out.println("[Demo][RabbitMQ-手动处理]:" + body);
 
         // 3. 用同一个 channel 去 ack
         channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

+ 7 - 7
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitListener.java → src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoRabbitListener.java

@@ -12,17 +12,17 @@ import java.io.IOException;
  */
 @Component
 @Lazy(false)
-public class RabbitListener {
+public class DemoRabbitListener {
 
 //    // [监听] 自动 ACK
-//    @RabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
+//    @ComfyuiRabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
 //    public void receive(SysUser sysUser) {
 //        System.out.println("收到消息: " + sysUser);
 //    }
 
 
 //    // [监听] 手动 ACK
-//    @RabbitListener(id = "demoContainer", queues = "demo.queue", ackMode = "MANUAL")
+//    @ComfyuiRabbitListener(id = "demoContainer", queues = "demo.queue", ackMode = "MANUAL")
 //    public void receive(Message message, Channel channel) throws IOException, InterruptedException {
 //        try {
 //            // 1. 模拟耗时业务
@@ -42,9 +42,9 @@ public class RabbitListener {
 //    }
 
     // [监听] 死信队列
-    @org.springframework.amqp.rabbit.annotation.RabbitListener(id = "dlxContainer", queues = "dlx.queue", ackMode = "MANUAL")
-    public void handleDlx(Message message, Channel channel) throws IOException {
-        System.out.println("【RabbitMQ-死信】收到:" + new String(message.getBody()));
+    @org.springframework.amqp.rabbit.annotation.RabbitListener(id = "demoDlxContainer", queues = "demo.dlx.queue", ackMode = "MANUAL")
+    public void handleDemoDlx(Message message, Channel channel) throws IOException {
+        System.out.println("[Demo][RabbitMQ-死信队列]收到:" + new String(message.getBody()));
 
         // 业务:记录日志 / 重发 / 报警 / 人工补偿
         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
@@ -53,7 +53,7 @@ public class RabbitListener {
 
     // 4. 并发消费(一条队列多线程)
     /*
-    @RabbitListener(queues = "demo.queue", concurrency = "5-10")
+    @ComfyuiRabbitListener(queues = "demo.queue", concurrency = "5-10")
     public void receive(String msg) { ... }
      */
 

+ 45 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoRabbitListenerRunner.java

@@ -0,0 +1,45 @@
+//package com.backendsys.modules.common.config.rabbitmq;
+//
+//import lombok.RequiredArgsConstructor;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+//import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
+//import org.springframework.boot.CommandLineRunner;
+//import org.springframework.stereotype.Component;
+//
+///**
+// * 手动启动监听器
+// * 由于 RabbitMQ 的监听器默认是懒加载,所以需要手动启动监听器
+// */
+//@Slf4j
+//@Component
+//@RequiredArgsConstructor
+//public class DemoRabbitListenerRunner implements CommandLineRunner {
+//
+//    private final RabbitListenerEndpointRegistry registry;
+//
+////    @Override
+////    public void run(String... args) {
+////        List<String> MANUAL_START_IDS = List.of("demoContainer", "demoDlxContainer");
+////        MANUAL_START_IDS.forEach(id -> {
+////            MessageListenerContainer c = registry.getListenerContainer(id);
+////            if (c != null && !c.isRunning()) {
+////                c.start();
+////                System.out.printf("-- ComfyuiRabbitListener '{}' 已手动启动 --", id);
+////            }
+////        });
+////    }
+//
+//    public void run(String... args) {
+//        MessageListenerContainer container = registry.getListenerContainer("demoDlxContainer");
+//
+//        System.out.println("-- DemoRabbitListenerRunner container = " + container);
+//        System.out.println("container.isRunning() = " + container.isRunning());
+//
+//        if (container != null && !container.isRunning()) {
+//            container.start();     // 关键:真正启动监听
+//            System.out.println("-- demoDlxContainer Listener 已手动启动 --");
+//        }
+//    }
+//
+//}

+ 0 - 41
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitListenerRunner.java

@@ -1,41 +0,0 @@
-package com.backendsys.modules.common.config.rabbitmq;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
-import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.stereotype.Component;
-
-/**
- * 手动启动监听器
- * 由于 RabbitMQ 的监听器默认是懒加载,所以需要手动启动监听器
- */
-@Slf4j
-@Component
-@RequiredArgsConstructor
-public class RabbitListenerRunner implements CommandLineRunner {
-
-    private final RabbitListenerEndpointRegistry registry;
-
-//    @Override
-//    public void run(String... args) {
-//        List<String> MANUAL_START_IDS = List.of("demoContainer", "dlxContainer");
-//        MANUAL_START_IDS.forEach(id -> {
-//            MessageListenerContainer c = registry.getListenerContainer(id);
-//            if (c != null && !c.isRunning()) {
-//                c.start();
-//                System.out.printf("-- RabbitListener '{}' 已手动启动 --", id);
-//            }
-//        });
-//    }
-
-    public void run(String... args) {
-        MessageListenerContainer container = registry.getListenerContainer("dlxContainer");
-        if (container != null && !container.isRunning()) {
-            container.start();     // 关键:真正启动监听
-            System.out.println("-- dlxContainer Listener 已手动启动 --");
-        }
-    }
-
-}

+ 30 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/DemoDlxQueueConfig.java

@@ -0,0 +1,30 @@
+package com.backendsys.modules.common.config.rabbitmq.queue;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * 死信队列
+ * 在项目启动时创建 (RabbitMQ) 队列、交换机、绑定关系
+ */
+@Configuration
+@Lazy(false)
+public class DemoDlxQueueConfig {
+
+    /* === 死信交换机/队列 === */
+    public static final String EXCHANGE = "demo.dlx";
+    public static final String QUEUE = "demo.dlx.queue";
+    public static final String ROUTING_KEY = "demo.dlx.routekey";
+
+    @Bean
+    public DirectExchange demoDlxExchange() { return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); }
+
+    @Bean
+    public Queue demoDlxQueue() { return QueueBuilder.durable(QUEUE).build(); }
+
+    @Bean
+    public Binding demoDlxBinding() { return BindingBuilder.bind(demoDlxQueue()).to(demoDlxExchange()).with(ROUTING_KEY); }
+
+}

+ 8 - 8
src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/QueueDemoConfig.java → src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/DemoQueueConfig.java

@@ -11,14 +11,14 @@ import org.springframework.context.annotation.Lazy;
  */
 @Configuration
 @Lazy(false)
-public class QueueDemoConfig {
+public class DemoQueueConfig {
 
     @Autowired
-    private QueueDlxConfig queueDlxConfig;
+    private DemoDlxQueueConfig config;
 
-    public static final String EXCHANGE = "order.exchange";
-    public static final String QUEUE = "order.queue";
-    public static final String ROUTING_KEY = "order.create";
+    public static final String EXCHANGE = "demo.exchange";
+    public static final String QUEUE = "demo.queue";
+    public static final String ROUTING_KEY = "demo.create";
 
     // 交换机
     @Bean
@@ -30,10 +30,10 @@ public class QueueDemoConfig {
     @Bean
     public Queue demoQueue() {
         return QueueBuilder.durable(QUEUE)
-                .ttl(5000)                    // 5s
+                .ttl(5000)                    // 5s`
                 .maxLength(1000)        // 1000条
-                .deadLetterExchange(queueDlxConfig.EXCHANGE)
-                .deadLetterRoutingKey(queueDlxConfig.ROUTING_KEY)
+                .deadLetterExchange(config.EXCHANGE)
+                .deadLetterRoutingKey(config.ROUTING_KEY)
                 .build();
     }
 

+ 0 - 30
src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/QueueDlxConfig.java

@@ -1,30 +0,0 @@
-package com.backendsys.modules.common.config.rabbitmq.queue;
-
-import org.springframework.amqp.core.*;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Lazy;
-
-/**
- * 死信队列
- * 在项目启动时创建 (RabbitMQ) 队列、交换机、绑定关系
- */
-@Configuration
-@Lazy(false)
-public class QueueDlxConfig {
-
-    /* === 死信交换机/队列 === */
-    public static final String EXCHANGE = "dlx";
-    public static final String QUEUE = "dlx.queue";
-    public static final String ROUTING_KEY = "dlx.routekey";
-
-    @Bean
-    public DirectExchange dlxExchange() { return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); }
-
-    @Bean
-    public Queue dlxQueue() { return QueueBuilder.durable(QUEUE).build(); }
-
-    @Bean
-    public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(ROUTING_KEY); }
-
-}

+ 1 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/entity/ComfyuiTask.java

@@ -16,6 +16,7 @@ public class ComfyuiTask {
     private String task_type;             // 任务类型 (Text2Image, ..)
     private Integer task_status;          // 任务状态 (-1:未开始, 1:进行中, 2:成功, 3:失败)
     private String generate_request;      // 任务请求原始参数 (JSONString)
+    private Integer in_master;            // 队列状态 (-1:未进主队列, 1:已进主队列)
     private String create_time;
     private String update_time;
 

+ 46 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiQueueConfig.java

@@ -0,0 +1,46 @@
+package com.backendsys.modules.sdk.comfyui.rabbitmq;
+
+import com.backendsys.modules.common.config.rabbitmq.queue.DemoDlxQueueConfig;
+import org.springframework.amqp.core.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * 非懒加载,在项目启动时创建 RabbitMQ 队列、交换机、绑定关系
+ */
+@Configuration
+@Lazy(false)
+public class ComfyuiQueueConfig {
+
+    @Autowired
+    private ComfyuiQueueDlxConfig config;
+
+    public static final String EXCHANGE = "comfyui.exchange";
+    public static final String QUEUE = "comfyui.queue";
+    public static final String ROUTING_KEY = "comfyui.create";
+
+    // 交换机
+    @Bean
+    public DirectExchange comfyuiExchange() {
+        return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
+    }
+
+    // 队列
+    @Bean
+    public Queue comfyuiQueue() {
+        return QueueBuilder.durable(QUEUE)
+            .deadLetterExchange(config.EXCHANGE)
+            .deadLetterRoutingKey(config.ROUTING_KEY)
+            .build();
+    }
+
+    // 队列绑定交换机
+    @Bean
+    public Binding comfyuiBinding() {
+        return BindingBuilder.bind(comfyuiQueue()).to(comfyuiExchange()).with(ROUTING_KEY);
+    }
+
+
+}

+ 30 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiQueueDlxConfig.java

@@ -0,0 +1,30 @@
+package com.backendsys.modules.sdk.comfyui.rabbitmq;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * 死信队列
+ * 在项目启动时创建 (RabbitMQ) 队列、交换机、绑定关系
+ */
+@Configuration
+@Lazy(false)
+public class ComfyuiQueueDlxConfig {
+
+    /* === 死信交换机/队列 === */
+    public static final String EXCHANGE = "comfyui.dlx";
+    public static final String QUEUE = "comfyui.dlx.queue";
+    public static final String ROUTING_KEY = "comfyui.dlx.routekey";
+
+    @Bean
+    public DirectExchange comfyuiDlxExchange() { return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); }
+
+    @Bean
+    public Queue comfyuiDlxQueue() { return QueueBuilder.durable(QUEUE).build(); }
+
+    @Bean
+    public Binding comfyuiDlxBinding() { return BindingBuilder.bind(comfyuiDlxQueue()).to(comfyuiDlxExchange()).with(ROUTING_KEY); }
+
+}

+ 36 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/rabbitmq/ComfyuiRabbitListener.java

@@ -0,0 +1,36 @@
+package com.backendsys.modules.sdk.comfyui.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 自定义监听器
+ */
+@Component
+@Lazy(false)
+public class ComfyuiRabbitListener {
+
+    // [监听] 死信队列
+    @RabbitListener(id = "ComfyuiDlxContainer", queues = "comfyui.dlx.queue", ackMode = "MANUAL")
+    public void handleComfyuiDlx(Message message, Channel channel) throws IOException {
+        System.out.println("[Comfyui][RabbitMQ-死信] 收到:" + new String(message.getBody()));
+
+        // 业务:记录日志 / 重发 / 报警 / 人工补偿
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+    }
+
+
+    // 4. 并发消费(一条队列多线程)
+    /*
+    @ComfyuiRabbitListener(queues = "demo.queue", concurrency = "5-10")
+    public void receive(String msg) { ... }
+     */
+
+
+
+}

+ 8 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/service/ComfyuiTaskService.java

@@ -0,0 +1,8 @@
+package com.backendsys.modules.sdk.comfyui.service;
+
+public interface ComfyuiTaskService {
+
+    // 推送主队列任务
+    void tryPushNext(long user_id);
+
+}

+ 3 - 3
src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiSocketServiceImpl.java

@@ -9,7 +9,7 @@ import com.backendsys.modules.crt.dao.CrtGenerateImageDao;
 import com.backendsys.modules.crt.entity.CrtGenerateImage;
 import com.backendsys.modules.sdk.comfyui.enums.TaskStatusEnums;
 import com.backendsys.modules.sdk.comfyui.service.ComfyuiSocketService;
-import com.backendsys.modules.sdk.comfyui.utils.ComfyUtil;
+import com.backendsys.modules.sdk.comfyui.utils.ComfyuiUtil;
 import com.backendsys.modules.sse.entity.SseResponse;
 import com.backendsys.modules.sse.entity.SseResponseEnum;
 import com.backendsys.modules.sse.utils.SseUtil;
@@ -41,7 +41,7 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
     private SseUtil sseUtil;
 
     @Autowired
-    private ComfyUtil comfyUtil;
+    private ComfyuiUtil comfyuiUtil;
     @Autowired
     private SysFileService sysFileService;
     @Autowired
@@ -188,7 +188,7 @@ public class ComfyuiSocketServiceImpl implements ComfyuiSocketService {
 
                                     // [DB] 执行任务
                                     CompletableFuture.runAsync(() -> {
-                                        comfyUtil.executeComfyuiTask(prompt_id, JSONUtil.toJsonStr(dataChildren), 2);
+                                        comfyuiUtil.executeComfyuiTask(prompt_id, JSONUtil.toJsonStr(dataChildren), 2);
                                     });
 
                                 }

+ 74 - 0
src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiTaskServiceImpl.java

@@ -0,0 +1,74 @@
+package com.backendsys.modules.sdk.comfyui.service.impl;
+
+import com.backendsys.modules.common.config.redis.utils.RedisUtil;
+import com.backendsys.modules.sdk.comfyui.dao.ComfyuiTaskDao;
+import com.backendsys.modules.sdk.comfyui.entity.ComfyuiTask;
+import com.backendsys.modules.sdk.comfyui.service.ComfyuiTaskService;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.Duration;
+
+@Service
+public class ComfyuiTaskServiceImpl implements ComfyuiTaskService {
+
+    @Autowired
+    private RedisUtil redisUtil;
+    @Autowired
+    private RedisTemplate redisTemplate;
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    private static final String MASTER_EX  = "master.job.exchange";
+    private static final String MASTER_RK  = "master.job";
+    private static final String TOKEN_KEY  = "user:%s:master_token";
+
+
+    @Autowired
+    private ComfyuiTaskDao comfyuiTaskDao;
+
+    /**
+     * 推送主队列任务
+     * 当用户提交新任务、或某任务执行结束后,都调用它
+     */
+    @Transactional
+    public void tryPushNext(long user_id) {
+        // 1. 如果该用户已在主队列里,直接返回
+        String token_key = String.format(TOKEN_KEY, user_id);
+        if (Boolean.TRUE.equals(redisTemplate.hasKey(token_key))) {
+            return;
+        }
+
+        // 2. 抢令牌(原子)
+        Boolean ok = redisTemplate.opsForValue().setIfAbsent(token_key, "1", Duration.ofMinutes(10));
+        if (!Boolean.TRUE.equals(ok)) {
+            return;          // 抢失败(并发极小概率)
+        }
+
+        // 3. 取该用户下一条未进主队列的任务
+        LambdaQueryWrapper<ComfyuiTask> wrapperTask = new LambdaQueryWrapper<>();
+        wrapperTask.eq(ComfyuiTask::getUser_id, user_id);
+        wrapperTask.eq(ComfyuiTask::getIn_master, 0);
+        ComfyuiTask comfyuiTask = comfyuiTaskDao.selectOne(wrapperTask);
+
+        if (comfyuiTask == null) {          // 用户已没任务
+            redisTemplate.delete(String.format(TOKEN_KEY, user_id));
+            return;
+        }
+
+        // 4. 打标 (进主队列)
+        ComfyuiTask entity = new ComfyuiTask();
+        entity.setIn_master(1);
+        comfyuiTaskDao.update(entity, wrapperTask);
+        comfyuiTask.setIn_master(1);
+
+        // 5. 发消息
+        rabbitTemplate.convertAndSend(MASTER_EX, MASTER_RK, comfyuiTask);
+
+    }
+
+}

+ 4 - 14
src/main/java/com/backendsys/modules/sdk/comfyui/service/impl/ComfyuiText2ImageServiceImpl.java

@@ -1,28 +1,18 @@
 package com.backendsys.modules.sdk.comfyui.service.impl;
 
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import com.backendsys.modules.sdk.comfyui.dao.ComfyuiTaskDao;
-import com.backendsys.modules.sdk.comfyui.entity.ComfyuiResponse;
 import com.backendsys.modules.sdk.comfyui.entity.ComfyuiText2Image;
 import com.backendsys.modules.sdk.comfyui.enums.TaskTypeEnums;
 import com.backendsys.modules.sdk.comfyui.service.ComfyuiService;
 import com.backendsys.modules.sdk.comfyui.service.ComfyuiText2ImageService;
-import com.backendsys.modules.sdk.comfyui.utils.ComfyUtil;
+import com.backendsys.modules.sdk.comfyui.utils.ComfyuiUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Mono;
-
-import java.util.concurrent.CompletableFuture;
 
 @Service
 public class ComfyuiText2ImageServiceImpl implements ComfyuiText2ImageService {
 
     @Autowired
-    private ComfyuiService comfyUIService;
-
-    @Autowired
-    private ComfyUtil comfyUtil;
+    private ComfyuiUtil comfyuiUtil;
 
     /**
      * [ComfyUI] 文生图 (7.16生图.json)
@@ -340,7 +330,7 @@ public class ComfyuiText2ImageServiceImpl implements ComfyuiText2ImageService {
 
 
         // [DB] 初始化任务
-        Long task_id = comfyUtil.initComfyuiTask(client_id, prompt, TaskTypeEnums.TEXT_2_IMAGE.getValue());
+        Long task_id = comfyuiUtil.initComfyuiTask(client_id, prompt, TaskTypeEnums.TEXT_2_IMAGE.getValue());
         return task_id;
 
 //        // [ComfyUI] 执行任务
@@ -431,7 +421,7 @@ public class ComfyuiText2ImageServiceImpl implements ComfyuiText2ImageService {
 
 
         // [DB] 初始化任务
-        Long task_id = comfyUtil.initComfyuiTask(client_id, prompt, TaskTypeEnums.TEXT_2_IMAGE.getValue());
+        Long task_id = comfyuiUtil.initComfyuiTask(client_id, prompt, TaskTypeEnums.TEXT_2_IMAGE.getValue());
         return task_id;
 
 //        // [ComfyUI] 执行任务

+ 1 - 1
src/main/java/com/backendsys/modules/sdk/comfyui/utils/ComfyUtil.java → src/main/java/com/backendsys/modules/sdk/comfyui/utils/ComfyuiUtil.java

@@ -10,7 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 @Component
-public class ComfyUtil {
+public class ComfyuiUtil {
 
     @Autowired
     private ComfyuiTaskDao comfyuiTaskDao;