Parcourir la source

Debug rabbitmq

tsurumure il y a 1 mois
Parent
commit
f4d7c99bdd

+ 37 - 3
src/main/java/com/backendsys/modules/TestController.java

@@ -13,6 +13,7 @@ import com.backendsys.modules.sdk.baidu.yunapp.entity.ExecuteScriptParams;
 import com.backendsys.modules.sdk.baidu.yunapp.service.YunappService;
 import com.backendsys.modules.sdk.volcengine.entity.VisualFaceSwapV2;
 import com.backendsys.modules.sdk.volcengine.service.VolcengineService;
+import com.backendsys.modules.system.entity.SysUser;
 import com.backendsys.service.TestService;
 import com.backendsys.utils.ResourceUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,6 +32,9 @@ import jakarta.annotation.PostConstruct;
 import jakarta.servlet.ServletContext;
 import org.apache.poi.xwpf.usermodel.XWPFDocument;
 import org.redisson.api.*;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.amqp.rabbit.core.RabbitAdmin;
@@ -82,14 +86,44 @@ public class TestController {
     @Value("${tencent.facefusion.secret-key}")
     private String SECRET_KEY;
 
-
-
+    @Autowired
+    private AmqpAdmin amqpAdmin;
     @Autowired
     private RabbitTemplate rabbitTemplate;
 
+    private void initRabbitMQ() {
+        System.out.println("-- initRabbitMQ. --");
+//        // 交换机
+//        amqpAdmin.declareExchange(new DirectExchange("demo.exchange", true, false));
+//
+////        Map<String, Object> args = new HashMap<>();
+////        args.put("x-message-ttl", 5000);       // 消息 TTL:60 秒(单位毫秒)
+////        // 队列
+////        amqpAdmin.declareQueue(new Queue("demo.queue", true, false, false, args));
+//
+//        amqpAdmin.declareQueue(new Queue("demo.queue", true));
+//
+//        // 把队列 demo.queue 绑定到交换机 demo.exchange,路由键设置为 order.create
+//        amqpAdmin.declareBinding(
+//                BindingBuilder.bind(new Queue("demo.queue"))
+//                        .to(new DirectExchange("demo.exchange"))
+//                        .with("order.create")
+//        );
+
+        // 单队列 (无死信)
+        amqpAdmin.declareQueue(new Queue("demo.queue", true));
+    }
+
     @GetMapping("/testRabbitMQ/send")
     public String send() {
-        rabbitTemplate.convertAndSend("demo.exchange", "order.create", "Hello RabbitMQ!");
+        initRabbitMQ();
+//        rabbitTemplate.convertAndSend("demo.exchange", "order.create", "Hello RabbitMQ!");
+//        rabbitTemplate.convertAndSend("", "demo.queue", "Hello RabbitMQ!");
+
+        SysUser sysUser = new SysUser();
+        sysUser.setId(1L);
+//        rabbitTemplate.convertAndSend("", "demo.queue", sysUser);
+        rabbitTemplate.convertAndSend("demo.exchange", "order.create", sysUser);
         return "ok";
     }
 

+ 92 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoListener.java

@@ -0,0 +1,92 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import com.backendsys.modules.system.entity.SysUser;
+import com.rabbitmq.client.Channel;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.QueueBuilder;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+import org.springframework.amqp.core.Message;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+@Component
+//@Lazy(false)
+/**
+ * 当没有队列的时候 demo.queue 时, 就会加载报错
+ */
+
+
+
+public class DemoListener {
+
+    // id 给容器一个名字,autoStartup=false 禁止自动启动
+//    @RabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
+//    public void receive(String msg) {
+//        System.out.println("收到消息: " + msg);
+//    }
+
+
+
+    // 自动 ACK
+    @RabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
+    public void receive(SysUser sysUser) {
+        System.out.println("收到消息: " + sysUser);
+    }
+
+//    // 手动 ACK
+//    @RabbitListener(id = "demoContainer", queues = "demo.queue", ackMode = "MANUAL")
+//    public void receive(Message message, Channel channel) throws IOException, InterruptedException {
+//        try {
+//            // 1. 模拟耗时业务(2 秒)
+//            Thread.sleep(10000);
+//
+//            // 2. 业务处理
+//            String body = new String(message.getBody(), StandardCharsets.UTF_8);
+//            System.out.println("处理消息: " + body);
+//
+//            // 3. 手动确认(deliveryTag + 是否批量)
+//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+//        } catch (Exception e) {
+//            System.err.println("处理失败: " + e.getMessage());
+//            // 拒绝并重新入队
+//            channel.basicNack(
+//                    message.getMessageProperties().getDeliveryTag(),
+//                    false,   // 不批量
+//                    true     // 重新入队
+//            );
+//        }
+//    }
+
+    // 死信队列
+//    @Bean
+//    public Queue demoQueue() {
+//        return QueueBuilder.durable("dlx.queue")
+//                .deadLetterExchange("dlx")
+//                .deadLetterRoutingKey("demo.dlx")
+//                .ttl(5000)                 // 可选:消息 10 秒过期
+//                .maxLength(1000)      // 可选:队列长度限制
+//                .build();
+//    }
+    // 和监听正常队列写法一模一样
+//    @RabbitListener(queues = "dlx.queue")
+//    public void handleDlx(Message message, Channel channel) throws IOException {
+//        System.out.println("【死信】收到:" + new String(message.getBody()));
+//
+//        // 业务:记录日志 / 重发 / 报警 / 人工补偿
+//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+//    }
+
+
+
+    // 4. 并发消费(一条队列多线程)
+    /*
+    @RabbitListener(queues = "demo.queue", concurrency = "5-10")
+    public void receive(String msg) { ... }
+     */
+
+
+
+}

+ 18 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitConfig.java

@@ -0,0 +1,18 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RabbitConfig {
+
+    /**
+     * 保证实体类可以通过 Jackson 序列化 / 反序列化 传参
+     */
+    @Bean
+    public MessageConverter messageConverter() {
+        return new Jackson2JsonMessageConverter();
+    }
+}

+ 0 - 34
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitInitializer.java

@@ -1,34 +0,0 @@
-package com.backendsys.modules.common.config.rabbitmq;
-
-import org.springframework.amqp.core.AmqpAdmin;
-import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.Queue;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.stereotype.Component;
-
-@Component
-public class RabbitInitializer implements ApplicationRunner {
-
-    @Autowired
-    private AmqpAdmin amqpAdmin;
-
-    @Override
-    public void run(ApplicationArguments args) {
-
-        System.out.println("-- RabbitInitializer.run --");
-        // 交换机
-        amqpAdmin.declareExchange(new DirectExchange("demo.exchange", true, false));
-        // 队列
-        amqpAdmin.declareQueue(new Queue("demo.queue", true));
-        // 把队列 demo.queue 绑定到交换机 demo.exchange,路由键设置为 order.create
-        amqpAdmin.declareBinding(
-                BindingBuilder.bind(new Queue("demo.queue"))
-                        .to(new DirectExchange("demo.exchange"))
-                        .with("order.create")
-        );
-
-    }
-}

+ 24 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitStartupRunner.java

@@ -0,0 +1,24 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class RabbitStartupRunner implements CommandLineRunner {
+
+    // 注入注册中心
+    private final RabbitListenerEndpointRegistry registry;
+
+    @Override
+    public void run(String... args) {
+        MessageListenerContainer container = registry.getListenerContainer("demoContainer");
+        if (container != null && !container.isRunning()) {
+            container.start();     // 关键:真正启动监听
+            System.out.println("-- RabbitListener 已手动启动 --");
+        }
+    }
+}