Browse Source

Dev RabbitMQ queue

tsurumure 1 month ago
parent
commit
07ea305ace

+ 2 - 2
src/main/java/com/backendsys/modules/TestController.java

@@ -112,7 +112,7 @@ public class TestController {
 
         SysUser sysUser = new SysUser();
         sysUser.setUsername(UUID.randomUUID().toString());
-        System.out.println("【发送】:" + sysUser);
+        System.out.println("【RabbitMQ-发送】:" + sysUser);
 //        rabbitTemplate.convertAndSend("", "demo.queue", sysUser);
         rabbitTemplate.convertAndSend("order.exchange", "order.create", sysUser);
 
@@ -135,7 +135,7 @@ public class TestController {
         }
 
         String body = new String(resp.getBody(), StandardCharsets.UTF_8);
-        System.out.println("手动处理完:" + body);
+        System.out.println("【RabbitMQ-手动处理】:" + body);
 
         // 3. 用同一个 channel 去 ack
         channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

+ 6 - 11
src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoListener.java → src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitListener.java

@@ -1,32 +1,27 @@
 package com.backendsys.modules.common.config.rabbitmq;
 
-import com.backendsys.modules.system.entity.SysUser;
 import com.rabbitmq.client.Channel;
 import org.springframework.amqp.core.*;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 
 /**
  * 自定义监听器
  */
 @Component
 @Lazy(false)
-public class DemoListener {
+public class RabbitListener {
 
-//    // 自动 ACK
+//    // [监听] 自动 ACK
 //    @RabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
 //    public void receive(SysUser sysUser) {
 //        System.out.println("收到消息: " + sysUser);
 //    }
 
 
-//    // 手动 ACK
+//    // [监听] 手动 ACK
 //    @RabbitListener(id = "demoContainer", queues = "demo.queue", ackMode = "MANUAL")
 //    public void receive(Message message, Channel channel) throws IOException, InterruptedException {
 //        try {
@@ -46,10 +41,10 @@ public class DemoListener {
 //        }
 //    }
 
-    // 监听死信队列
-    @RabbitListener(id = "dlxContainer", queues = "dlx.queue", ackMode = "MANUAL")
+    // [监听] 死信队列
+    @org.springframework.amqp.rabbit.annotation.RabbitListener(id = "dlxContainer", queues = "dlx.queue", ackMode = "MANUAL")
     public void handleDlx(Message message, Channel channel) throws IOException {
-        System.out.println("【死信】收到:" + new String(message.getBody()));
+        System.out.println("【RabbitMQ-死信】收到:" + new String(message.getBody()));
 
         // 业务:记录日志 / 重发 / 报警 / 人工补偿
         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

+ 41 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitListenerRunner.java

@@ -0,0 +1,41 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+/**
+ * 手动启动监听器
+ * 由于 RabbitMQ 的监听器默认是懒加载,所以需要手动启动监听器
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class RabbitListenerRunner implements CommandLineRunner {
+
+    private final RabbitListenerEndpointRegistry registry;
+
+//    @Override
+//    public void run(String... args) {
+//        List<String> MANUAL_START_IDS = List.of("demoContainer", "dlxContainer");
+//        MANUAL_START_IDS.forEach(id -> {
+//            MessageListenerContainer c = registry.getListenerContainer(id);
+//            if (c != null && !c.isRunning()) {
+//                c.start();
+//                System.out.printf("-- RabbitListener '{}' 已手动启动 --", id);
+//            }
+//        });
+//    }
+
+    public void run(String... args) {
+        MessageListenerContainer container = registry.getListenerContainer("dlxContainer");
+        if (container != null && !container.isRunning()) {
+            container.start();     // 关键:真正启动监听
+            System.out.println("-- dlxContainer Listener 已手动启动 --");
+        }
+    }
+
+}

+ 0 - 44
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitStartupRunner.java

@@ -1,44 +0,0 @@
-package com.backendsys.modules.common.config.rabbitmq;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
-import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * 手动启动监听器
- * 由于 RabbitMQ 的监听器默认是懒加载,所以需要手动启动监听器
- */
-@Slf4j
-@Component
-@RequiredArgsConstructor
-public class RabbitStartupRunner implements CommandLineRunner {
-
-    private final RabbitListenerEndpointRegistry registry;
-
-    @Override
-    public void run(String... args) {
-        List<String> MANUAL_START_IDS = List.of("demoContainer", "dlxContainer");
-        MANUAL_START_IDS.forEach(id -> {
-            MessageListenerContainer c = registry.getListenerContainer(id);
-            if (c != null && !c.isRunning()) {
-                c.start();
-                log.info("-- RabbitListener '{}' 已手动启动 --", id);
-            }
-        });
-    }
-
-
-//    public void run(String... args) {
-//        MessageListenerContainer container = registry.getListenerContainer("demoContainer");
-//        if (container != null && !container.isRunning()) {
-//            container.start();     // 关键:真正启动监听
-//            System.out.println("-- RabbitListener 已手动启动 --");
-//        }
-//    }
-
-}

+ 7 - 7
src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoRabbitConfig.java → src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/QueueDemoConfig.java

@@ -1,4 +1,4 @@
-package com.backendsys.modules.common.config.rabbitmq;
+package com.backendsys.modules.common.config.rabbitmq.queue;
 
 import org.springframework.amqp.core.*;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -11,13 +11,13 @@ import org.springframework.context.annotation.Lazy;
  */
 @Configuration
 @Lazy(false)
-public class DemoRabbitConfig {
+public class QueueDemoConfig {
 
     @Autowired
-    private RabbitDlxConfig rabbitDlxConfig;
+    private QueueDlxConfig queueDlxConfig;
 
-    public static final String EXCHANGE   = "order.exchange";
-    public static final String QUEUE    = "order.queue";
+    public static final String EXCHANGE = "order.exchange";
+    public static final String QUEUE = "order.queue";
     public static final String ROUTING_KEY = "order.create";
 
     // 交换机
@@ -32,8 +32,8 @@ public class DemoRabbitConfig {
         return QueueBuilder.durable(QUEUE)
                 .ttl(5000)                    // 5s
                 .maxLength(1000)        // 1000条
-                .deadLetterExchange(rabbitDlxConfig.DLX_EX)
-                .deadLetterRoutingKey(rabbitDlxConfig.DLX_RK)
+                .deadLetterExchange(queueDlxConfig.EXCHANGE)
+                .deadLetterRoutingKey(queueDlxConfig.ROUTING_KEY)
                 .build();
     }
 

+ 8 - 8
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitDlxConfig.java → src/main/java/com/backendsys/modules/common/config/rabbitmq/queue/QueueDlxConfig.java

@@ -1,4 +1,4 @@
-package com.backendsys.modules.common.config.rabbitmq;
+package com.backendsys.modules.common.config.rabbitmq.queue;
 
 import org.springframework.amqp.core.*;
 import org.springframework.context.annotation.Bean;
@@ -11,20 +11,20 @@ import org.springframework.context.annotation.Lazy;
  */
 @Configuration
 @Lazy(false)
-public class RabbitDlxConfig {
+public class QueueDlxConfig {
 
     /* === 死信交换机/队列 === */
-    public static final String DLX_EX = "dlx";
-    public static final String DLX_Q  = "dlx.queue";
-    public static final String DLX_RK = "dlx.routekey";
+    public static final String EXCHANGE = "dlx";
+    public static final String QUEUE = "dlx.queue";
+    public static final String ROUTING_KEY = "dlx.routekey";
 
     @Bean
-    public DirectExchange dlxExchange() { return ExchangeBuilder.directExchange(DLX_EX).durable(true).build(); }
+    public DirectExchange dlxExchange() { return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); }
 
     @Bean
-    public Queue dlxQueue() { return QueueBuilder.durable(DLX_Q).build(); }
+    public Queue dlxQueue() { return QueueBuilder.durable(QUEUE).build(); }
 
     @Bean
-    public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_RK); }
+    public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(ROUTING_KEY); }
 
 }