Browse Source

Test rabbitmq

tsurumure 1 month ago
parent
commit
e9ce061fec

+ 6 - 0
pom.xml

@@ -39,6 +39,12 @@
             </exclusions>
         </dependency>
 
+        <!-- RabbitMQ -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
+
         <!-- Nacos -->
 
         <!-- logback / log4j / slf4j -->

+ 31 - 0
src/main/java/com/backendsys/modules/TestController.java

@@ -27,9 +27,14 @@ import com.tencentcloudapi.common.Credential;
 //import io.github.pigmesh.ai.deepseek.core.chat.ChatCompletionResponse;
 import com.volcengine.service.visual.IVisualService;
 import com.volcengine.service.visual.impl.VisualServiceImpl;
+import jakarta.annotation.PostConstruct;
 import jakarta.servlet.ServletContext;
 import org.apache.poi.xwpf.usermodel.XWPFDocument;
 import org.redisson.api.*;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Lazy;
@@ -78,6 +83,32 @@ public class TestController {
     private String SECRET_KEY;
 
 
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @GetMapping("/testRabbitMQ/send")
+    public String send() {
+        rabbitTemplate.convertAndSend("demo.exchange", "order.create", "Hello RabbitMQ!");
+        return "ok";
+    }
+
+    @PostConstruct
+    public void init() {
+        rabbitTemplate.setConfirmCallback((corData, ack, cause) -> {
+            if (!ack) {
+                System.out.println("消息发送失败:" + corData + ",原因:" + cause);
+            }
+        });
+        rabbitTemplate.setReturnsCallback(ret -> {
+            System.out.println("消息无法路由到队列:" + ret.getMessage());
+        });
+    }
+
+
+
+
+
 //    @Autowired
 //    private DeepSeekClientImpl deepSeekClient;
 //

+ 34 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitInitializer.java

@@ -0,0 +1,34 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+@Component
+public class RabbitInitializer implements ApplicationRunner {
+
+    @Autowired
+    private AmqpAdmin amqpAdmin;
+
+    @Override
+    public void run(ApplicationArguments args) {
+
+        System.out.println("-- RabbitInitializer.run --");
+        // 交换机
+        amqpAdmin.declareExchange(new DirectExchange("demo.exchange", true, false));
+        // 队列
+        amqpAdmin.declareQueue(new Queue("demo.queue", true));
+        // 把队列 demo.queue 绑定到交换机 demo.exchange,路由键设置为 order.create
+        amqpAdmin.declareBinding(
+                BindingBuilder.bind(new Queue("demo.queue"))
+                        .to(new DirectExchange("demo.exchange"))
+                        .with("order.create")
+        );
+
+    }
+}

+ 95 - 95
src/main/java/com/backendsys/modules/queue/controller/TaskStatusController.java

@@ -1,95 +1,95 @@
-package com.backendsys.modules.queue.controller;
-
-import cn.hutool.core.convert.Convert;
-import com.backendsys.modules.common.config.security.annotations.Anonymous;
-import com.backendsys.modules.common.utils.Result;
-import com.backendsys.modules.queue.entity.Entire;
-import com.backendsys.modules.queue.service.TaskService;
-import com.backendsys.modules.sdk.comfyui.enums.TaskStatusEnums;
-import io.swagger.v3.oas.annotations.Operation;
-import jakarta.annotation.PostConstruct;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.redis.connection.stream.MapRecord;
-import org.springframework.data.redis.connection.stream.PendingMessages;
-import org.springframework.data.redis.connection.stream.RecordId;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-@RestController
-public class TaskStatusController {
-
-    @Autowired
-    private TaskService taskService;
-
-    @Value("${comfyui.queue-key}")
-    private String QUEUE_KEY;
-    @Value("${spring.application.name}")
-    private String APPLICATION_NAME;
-
-
-    @PostConstruct
-    @Operation(summary = "初始化消费者组")
-    public void initConsumerGroup() {
-        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
-        String group_name = stream_key + ":group";
-        taskService.initConsumerGroup(stream_key, group_name);
-    }
-
-    @Operation(summary = "添加任务")
-    @GetMapping("/api/tasks/addTask")
-    public Result addTask() {
-        Map<String, Object> data = new LinkedHashMap<>();
-        data.put("progress", 0);
-        data.put("status", TaskStatusEnums.EXECUTION_START.getValue());
-
-        // 加入队列
-        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
-        RecordId record_id = taskService.addTask(stream_key, data);
-        data.put("task_id", record_id.getValue());
-
-        return Result.success().put("data", data);
-    }
-
-    @Anonymous
-    @Operation(summary = "消费任务")
-    @GetMapping("/api/tasks/consumeTask")
-    public Result consumeTask(String task_id) {
-        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
-        taskService.consumeTask(stream_key, task_id);
-        return Result.success();
-    }
-
-    @Anonymous
-    @Operation(summary = "移除任务")
-    @GetMapping("/api/tasks/removeTask")
-    public Result removeTask(String task_id) {
-        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
-        taskService.removeTask(stream_key, task_id);
-        return Result.success();
-    }
-
-    @Anonymous
-    @Operation(summary = "获取任务列表")
-    @GetMapping("/api/tasks/getTaskList")
-    public Result getTaskList() {
-        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
-        List<MapRecord<String, String, String>> task_list = taskService.getTaskList(stream_key);
-        return Result.success().put("data", task_list);
-    }
-
-    @Anonymous
-    @Operation(summary = "获取任务列表(未消费)")
-    @GetMapping("/api/tasks/getTaskPaddingList")
-    public Result getTaskPaddingList() {
-        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
-        List<MapRecord<String, String, String>> task_list = taskService.getTaskPaddingList(stream_key);
-        return Result.success().put("data", task_list);
-    }
-
-}
+//package com.backendsys.modules.queue.controller;
+//
+//import cn.hutool.core.convert.Convert;
+//import com.backendsys.modules.common.config.security.annotations.Anonymous;
+//import com.backendsys.modules.common.utils.Result;
+//import com.backendsys.modules.queue.entity.Entire;
+//import com.backendsys.modules.queue.service.TaskService;
+//import com.backendsys.modules.sdk.comfyui.enums.TaskStatusEnums;
+//import io.swagger.v3.oas.annotations.Operation;
+//import jakarta.annotation.PostConstruct;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.data.redis.connection.stream.MapRecord;
+//import org.springframework.data.redis.connection.stream.PendingMessages;
+//import org.springframework.data.redis.connection.stream.RecordId;
+//import org.springframework.web.bind.annotation.GetMapping;
+//import org.springframework.web.bind.annotation.RestController;
+//
+//import java.util.LinkedHashMap;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.UUID;
+//
+//@RestController
+//public class TaskStatusController {
+//
+//    @Autowired
+//    private TaskService taskService;
+//
+//    @Value("${comfyui.queue-key}")
+//    private String QUEUE_KEY;
+//    @Value("${spring.application.name}")
+//    private String APPLICATION_NAME;
+//
+//
+//    @PostConstruct
+//    @Operation(summary = "初始化消费者组")
+//    public void initConsumerGroup() {
+//        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
+//        String group_name = stream_key + ":group";
+//        taskService.initConsumerGroup(stream_key, group_name);
+//    }
+//
+//    @Operation(summary = "添加任务")
+//    @GetMapping("/api/tasks/addTask")
+//    public Result addTask() {
+//        Map<String, Object> data = new LinkedHashMap<>();
+//        data.put("progress", 0);
+//        data.put("status", TaskStatusEnums.EXECUTION_START.getValue());
+//
+//        // 加入队列
+//        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
+//        RecordId record_id = taskService.addTask(stream_key, data);
+//        data.put("task_id", record_id.getValue());
+//
+//        return Result.success().put("data", data);
+//    }
+//
+//    @Anonymous
+//    @Operation(summary = "消费任务")
+//    @GetMapping("/api/tasks/consumeTask")
+//    public Result consumeTask(String task_id) {
+//        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
+//        taskService.consumeTask(stream_key, task_id);
+//        return Result.success();
+//    }
+//
+//    @Anonymous
+//    @Operation(summary = "移除任务")
+//    @GetMapping("/api/tasks/removeTask")
+//    public Result removeTask(String task_id) {
+//        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
+//        taskService.removeTask(stream_key, task_id);
+//        return Result.success();
+//    }
+//
+//    @Anonymous
+//    @Operation(summary = "获取任务列表")
+//    @GetMapping("/api/tasks/getTaskList")
+//    public Result getTaskList() {
+//        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
+//        List<MapRecord<String, String, String>> task_list = taskService.getTaskList(stream_key);
+//        return Result.success().put("data", task_list);
+//    }
+//
+//    @Anonymous
+//    @Operation(summary = "获取任务列表(未消费)")
+//    @GetMapping("/api/tasks/getTaskPaddingList")
+//    public Result getTaskPaddingList() {
+//        String stream_key = APPLICATION_NAME + ":" + QUEUE_KEY;
+//        List<MapRecord<String, String, String>> task_list = taskService.getTaskPaddingList(stream_key);
+//        return Result.success().put("data", task_list);
+//    }
+//
+//}

+ 14 - 14
src/main/java/com/backendsys/modules/queue/entity/QueuePosition.java

@@ -1,14 +1,14 @@
-//package com.backendsys.modules.queue.entity;
-//
-//import lombok.Data;
-//
-//@Data
-//public class QueuePosition {
-//    private int position; // 当前排队位置
-//    private int total; // 队列总数
-//
-//    public QueuePosition(int position, int total) {
-//        this.position = position;
-//        this.total = total;
-//    }
-//}
+package com.backendsys.modules.queue.entity;
+
+import lombok.Data;
+
+@Data
+public class QueuePosition {
+    private int position; // 当前排队位置
+    private int total; // 队列总数
+
+    public QueuePosition(int position, int total) {
+        this.position = position;
+        this.total = total;
+    }
+}

+ 15 - 15
src/main/java/com/backendsys/modules/queue/entity/QueueRequest.java

@@ -1,15 +1,15 @@
-//package com.backendsys.modules.queue.entity;
-//
-//import lombok.Data;
-//
-//import java.util.UUID;
-//
-//@Data
-//public class QueueRequest {
-//    private String id;
-//    private int position;
-//
-//    public QueueRequest() {
-//        this.id = UUID.randomUUID().toString(); // 自动生成唯一标识符
-//    }
-//}
+package com.backendsys.modules.queue.entity;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+@Data
+public class QueueRequest {
+    private String id;
+    private int position;
+
+    public QueueRequest() {
+        this.id = UUID.randomUUID().toString(); // 自动生成唯一标识符
+    }
+}

+ 107 - 106
src/main/java/com/backendsys/modules/queue/service/QueueService.java

@@ -1,106 +1,107 @@
-//package com.backendsys.modules.queue.service;
-//
-//import cn.hutool.core.convert.Convert;
-//import com.backendsys.modules.queue.entity.QueuePosition;
-//import com.backendsys.modules.queue.entity.QueueRequest;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.data.redis.core.StringRedisTemplate;
-//import org.springframework.stereotype.Service;
-//
-//import java.util.concurrent.atomic.AtomicInteger;
-//
-//@Service
-//public class QueueService {
-//    private final StringRedisTemplate redisTemplate;
-//    private final AtomicInteger counter = new AtomicInteger(0);
-//
-//    @Autowired
-//    public QueueService(StringRedisTemplate redisTemplate) {
-//        this.redisTemplate = redisTemplate;
-//    }
-//
-//    /**
-//     * 分配排队编号
-//     */
-//    public int enqueue(String queueKey, QueueRequest queueRequest) {
-//        // 为每个请求分配一个排队编号
-//        Long size = redisTemplate.opsForZSet().size(queueKey);
-//        int position = (size == null) ? 1 : size.intValue() + 1;
-//        //int position = counter.incrementAndGet();
-//        System.out.println("排号: " + position + ", request_id: " + queueRequest.getId());
-//
-//        queueRequest.setPosition(position);
-//        redisTemplate.opsForZSet().add(queueKey, queueRequest.getId(), position);
-//
-//        return position;
-//    }
-//
-//    /**
-//     * 开始排队
-//     */
-//    public void startProcessing(String queueKey) {
-////        new Thread(() -> {
-////            while (true) {
-////                // 从有序集合中取出第一个请求
-////                String requestId = Convert.toStr(redisTemplate.opsForZSet().popMin(queueKey));
-////                if (requestId != null) {
-////                    QueueRequest queueRequest = getRequestById(requestId);
-////                    int position = queueRequest.getPosition();
-////                    // 处理请求
-////                    processRequest(queueRequest);
-////                    // 可以通知用户处理完成
-////                    notifyUser(queueRequest, position);
-////                }
-////            }
-////        }).start();
-//        String requestId = Convert.toStr(redisTemplate.opsForZSet().popMin(queueKey));
-//        System.out.println("requestId = " + requestId);
-//    }
-//
-//    private void processRequest(QueueRequest queueRequest) {
-//        // 模拟耗时操作
-//        try {
-//            Thread.sleep(10 * 1000);
-//        } catch (InterruptedException e) {
-//            Thread.currentThread().interrupt();
-//        }
-//    }
-//
-//    /**
-//     * 通知
-//     */
-//    private void notifyUser(QueueRequest queueRequest, int position) {
-//        // 通知用户处理完成
-//        System.out.println("Request " + position + " processed.");
-//    }
-//
-//    private QueueRequest getRequestById(String requestId) {
-//        // 根据请求ID获取请求对象
-//        return new QueueRequest();
-//    }
-//
-//    /**
-//     * 获取请求的排队位置和队列总数
-//     * @param requestId 请求ID
-//     * @return 一个包含排队位置和队列总数的对象
-//     */
-//    public QueuePosition getPosition(String queueKey, String requestId) {
-//        // 获取请求的排队位置
-//        Long rank = redisTemplate.opsForZSet().rank(queueKey, requestId);
-//        if (rank == null) {
-//            return new QueuePosition(-1, 0); // 请求不存在
-//        }
-//
-//        // 获取队列的总大小(未处理的请求数量)
-//        Long size = redisTemplate.opsForZSet().size(queueKey);
-//        int total = (size == null) ? 0 : size.intValue();
-//
-//        // 当前排队位置从0开始,加1表示实际排队位置
-//        int position = rank.intValue() + 1;
-//
-//        return new QueuePosition(position, total);
-//    }
-//
-//
-//}
-//
+package com.backendsys.modules.queue.service;
+
+import cn.hutool.core.convert.Convert;
+import com.backendsys.modules.queue.entity.QueuePosition;
+import com.backendsys.modules.queue.entity.QueueRequest;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Service
+public class QueueService {
+    private final StringRedisTemplate redisTemplate;
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+
+    @Autowired
+    public QueueService(StringRedisTemplate redisTemplate) {
+        this.redisTemplate = redisTemplate;
+    }
+
+    /**
+     * 分配排队编号
+     */
+    public int enqueue(String queueKey, QueueRequest queueRequest) {
+        // 为每个请求分配一个排队编号
+        Long size = redisTemplate.opsForZSet().size(queueKey);
+        int position = (size == null) ? 1 : size.intValue() + 1;
+        //int position = counter.incrementAndGet();
+        System.out.println("排号: " + position + ", request_id: " + queueRequest.getId());
+
+        queueRequest.setPosition(position);
+        redisTemplate.opsForZSet().add(queueKey, queueRequest.getId(), position);
+
+        return position;
+    }
+
+    /**
+     * 开始排队
+     */
+    public void startProcessing(String queueKey) {
+//        new Thread(() -> {
+//            while (true) {
+//                // 从有序集合中取出第一个请求
+//                String requestId = Convert.toStr(redisTemplate.opsForZSet().popMin(queueKey));
+//                if (requestId != null) {
+//                    QueueRequest queueRequest = getRequestById(requestId);
+//                    int position = queueRequest.getPosition();
+//                    // 处理请求
+//                    processRequest(queueRequest);
+//                    // 可以通知用户处理完成
+//                    notifyUser(queueRequest, position);
+//                }
+//            }
+//        }).start();
+        String requestId = Convert.toStr(redisTemplate.opsForZSet().popMin(queueKey));
+        System.out.println("requestId = " + requestId);
+    }
+
+    private void processRequest(QueueRequest queueRequest) {
+        // 模拟耗时操作
+        try {
+            Thread.sleep(10 * 1000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * 通知
+     */
+    private void notifyUser(QueueRequest queueRequest, int position) {
+        // 通知用户处理完成
+        System.out.println("Request " + position + " processed.");
+    }
+
+    private QueueRequest getRequestById(String requestId) {
+        // 根据请求ID获取请求对象
+        return new QueueRequest();
+    }
+
+    /**
+     * 获取请求的排队位置和队列总数
+     * @param requestId 请求ID
+     * @return 一个包含排队位置和队列总数的对象
+     */
+    public QueuePosition getPosition(String queueKey, String requestId) {
+        // 获取请求的排队位置
+        Long rank = redisTemplate.opsForZSet().rank(queueKey, requestId);
+        if (rank == null) {
+            return new QueuePosition(-1, 0); // 请求不存在
+        }
+
+        // 获取队列的总大小(未处理的请求数量)
+        Long size = redisTemplate.opsForZSet().size(queueKey);
+        int total = (size == null) ? 0 : size.intValue();
+
+        // 当前排队位置从0开始,加1表示实际排队位置
+        int position = rank.intValue() + 1;
+
+        return new QueuePosition(position, total);
+    }
+
+
+}
+

+ 13 - 1
src/main/resources/application-dev.yml

@@ -47,6 +47,18 @@ spring:
       host: 172.19.0.7
       port: 6388
       password: p1FM!fkfPdBQ%@5o
+  rabbitmq:
+    host: localhost
+    port: 5672
+    username: guest
+    password: guest
+    publisher-confirm-type: correlated   # 开启发布确认
+    publisher-returns: true              # 开启发布返回
+    listener:
+      simple:
+        concurrency: 8
+        max-concurrency: 16
+        prefetch: 32
 
 #    cache:
 #      type: redis
@@ -201,4 +213,4 @@ comfyui:
   host: 127.0.0.1
   token: $2b$12$.MR4qGaFetN1FPQzbfyIrehsyjnPJ12xAZhR/l7KZpLkUPQTCG4gy
   is-save: true
-  queue-key: comfyui:queue
+  # queue-key: comfyui:queue

+ 15 - 1
src/main/resources/application-local.yml

@@ -47,6 +47,20 @@ spring:
       host: 127.0.0.1
       port: 6388
       password: 123456
+  rabbitmq:
+    host: localhost
+    port: 5672
+    username: guest
+    password: guest
+    publisher-confirm-type: correlated   # 开启发布确认
+    publisher-returns: true              # 开启发布返回
+    template:
+      mandatory: true         # 强制检查消息是否路由成功(配合publisher-returns
+    listener:
+      simple:
+        concurrency: 8
+        max-concurrency: 16
+        prefetch: 32
 
 #    cache:
 #      type: redis
@@ -214,4 +228,4 @@ comfyui:
   host: 43.128.1.201
   token: $2b$12$.MR4qGaFetN1FPQzbfyIrehsyjnPJ12xAZhR/l7KZpLkUPQTCG4gy
   is-save: true
-  queue-key: comfyui:queue
+  # queue-key: comfyui:queue

+ 13 - 1
src/main/resources/application-prod.yml

@@ -47,6 +47,18 @@ spring:
       host: 172.19.0.6
       port: 6388
       password: stI2gmsq$Y9z3vdT
+  rabbitmq:
+    host: localhost
+    port: 5672
+    username: guest
+    password: guest
+    publisher-confirm-type: correlated   # 开启发布确认
+    publisher-returns: true              # 开启发布返回
+    listener:
+      simple:
+        concurrency: 8
+        max-concurrency: 16
+        prefetch: 32
 
 #    cache:
 #      type: redis
@@ -202,4 +214,4 @@ comfyui:
   host: 127.0.0.1
   token: $2b$12$.MR4qGaFetN1FPQzbfyIrehsyjnPJ12xAZhR/l7KZpLkUPQTCG4gy
   is-save: true
-  queue-key: comfyui:queue
+  # queue-key: comfyui:queue