Ver código fonte

接入RabbitMQ到项目

tsurumure 1 mês atrás
pai
commit
6941b4862a

+ 57 - 43
src/main/java/com/backendsys/modules/TestController.java

@@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import cn.afterturn.easypoi.word.WordExportUtil;
 import com.backendsys.utils.MD5Util;
 
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.GetResponse;
 import com.tencentcloudapi.common.Credential;
 //import io.github.pigmesh.ai.deepseek.core.DeepSeekClientImpl;
 //import io.github.pigmesh.ai.deepseek.core.OpenAiHttpException;
@@ -32,11 +34,10 @@ import jakarta.annotation.PostConstruct;
 import jakarta.servlet.ServletContext;
 import org.apache.poi.xwpf.usermodel.XWPFDocument;
 import org.redisson.api.*;
-import org.springframework.amqp.core.AmqpAdmin;
-import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.*;
 import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.connection.Connection;
 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -48,6 +49,7 @@ import org.springframework.web.bind.annotation.*;
 
 import java.awt.*;
 import java.io.*;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.NoSuchAlgorithmException;
@@ -56,21 +58,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
-//import com.tencentcloudapi.facefusion.v20181201.FacefusionClient;
-//import com.tencentcloudapi.facefusion.v20181201.models.*;
-
-//import com.tencentcloudapi.facefusion.v20220927.FacefusionClient;
-//import com.tencentcloudapi.facefusion.v20220927.models.*;
-
-//import com.tencentcloudapi.common.Credential;
-//import com.tencentcloudapi.common.profile.ClientProfile;
-//import com.tencentcloudapi.common.profile.HttpProfile;
-//import com.tencentcloudapi.common.exception.TencentCloudSDKException;
-//import com.tencentcloudapi.facefusion.v20181201.FacefusionClient;
-//import com.tencentcloudapi.facefusion.v20181201.models.*;
-
-
 import com.tencentcloudapi.common.CommonClient;
 import com.tencentcloudapi.common.exception.TencentCloudSDKException;
 
@@ -86,22 +73,24 @@ public class TestController {
     @Value("${tencent.facefusion.secret-key}")
     private String SECRET_KEY;
 
-    @Autowired
-    private AmqpAdmin amqpAdmin;
+//    @Autowired
+//    private AmqpAdmin amqpAdmin;
     @Autowired
     private RabbitTemplate rabbitTemplate;
 
-    private void initRabbitMQ() {
-        System.out.println("-- initRabbitMQ. --");
+//    private void initRabbitMQ() {
+//        System.out.println("-- initRabbitMQ. --");
+//
 //        // 交换机
 //        amqpAdmin.declareExchange(new DirectExchange("demo.exchange", true, false));
 //
-////        Map<String, Object> args = new HashMap<>();
-////        args.put("x-message-ttl", 5000);       // 消息 TTL:60 秒(单位毫秒)
-////        // 队列
-////        amqpAdmin.declareQueue(new Queue("demo.queue", true, false, false, args));
+//        // 队列 (TTL + 死信)
+//        Map<String, Object> args = new HashMap<>();
+//        args.put("x-message-ttl", 5000);                // 消息 TTL:60 秒(单位毫秒)
+//        amqpAdmin.declareQueue(new Queue("demo.queue", true, false, false, args));
 //
-//        amqpAdmin.declareQueue(new Queue("demo.queue", true));
+////        // 队列
+////        amqpAdmin.declareQueue(new Queue("demo.queue", true));
 //
 //        // 把队列 demo.queue 绑定到交换机 demo.exchange,路由键设置为 order.create
 //        amqpAdmin.declareBinding(
@@ -110,39 +99,64 @@ public class TestController {
 //                        .with("order.create")
 //        );
 
-        // 单队列 (无死信)
-        amqpAdmin.declareQueue(new Queue("demo.queue", true));
-    }
+//        // 单队列 (无死信)
+//        amqpAdmin.declareQueue(new Queue("demo.queue", true));
+//    }
 
     @GetMapping("/testRabbitMQ/send")
     public String send() {
-        initRabbitMQ();
+
+//        initRabbitMQ();
 //        rabbitTemplate.convertAndSend("demo.exchange", "order.create", "Hello RabbitMQ!");
 //        rabbitTemplate.convertAndSend("", "demo.queue", "Hello RabbitMQ!");
 
         SysUser sysUser = new SysUser();
-        sysUser.setId(1L);
+        sysUser.setUsername(UUID.randomUUID().toString());
+        System.out.println("【发送】:" + sysUser);
 //        rabbitTemplate.convertAndSend("", "demo.queue", sysUser);
-        rabbitTemplate.convertAndSend("demo.exchange", "order.create", sysUser);
+        rabbitTemplate.convertAndSend("order.exchange", "order.create", sysUser);
+
         return "ok";
     }
 
-    @PostConstruct
-    public void init() {
-        rabbitTemplate.setConfirmCallback((corData, ack, cause) -> {
-            if (!ack) {
-                System.out.println("消息发送失败:" + corData + ",原因:" + cause);
-            }
-        });
-        rabbitTemplate.setReturnsCallback(ret -> {
-            System.out.println("消息无法路由到队列:" + ret.getMessage());
-        });
+    @GetMapping("/testRabbitMQ/poll")
+    public String poll() throws Exception {
+
+        // 1. 拿到连接和 channel(不要每次 new)
+        Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
+        Channel channel = connection.createChannel(false);
+
+        // 2. 手动拉消息,第二个参数传 false 表示“不要自动 ack”
+        GetResponse resp = channel.basicGet("order.queue", false);
+        if (resp == null) {
+            channel.close();
+            connection.close();
+            return "no message";
+        }
+
+        String body = new String(resp.getBody(), StandardCharsets.UTF_8);
+        System.out.println("手动处理完:" + body);
+
+        // 3. 用同一个 channel 去 ack
+        channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
+        channel.close();
+        connection.close();
+        return "acked: " + body;
+
+
+//        try {
+//            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
+//        } finally {
+//            channel.close();
+//        }
+//        return "acked: " + body;
     }
 
 
 
 
 
+
 //    @Autowired
 //    private DeepSeekClientImpl deepSeekClient;
 //

+ 21 - 46
src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoListener.java

@@ -2,83 +2,58 @@ package com.backendsys.modules.common.config.rabbitmq;
 
 import com.backendsys.modules.system.entity.SysUser;
 import com.rabbitmq.client.Channel;
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.core.QueueBuilder;
+import org.springframework.amqp.core.*;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
-import org.springframework.amqp.core.Message;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
-@Component
-//@Lazy(false)
 /**
- * 当没有队列的时候 demo.queue 时, 就会加载报错
+ * 自定义监听器
  */
-
-
-
+@Component
+@Lazy(false)
 public class DemoListener {
 
-    // id 给容器一个名字,autoStartup=false 禁止自动启动
+//    // 自动 ACK
 //    @RabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
-//    public void receive(String msg) {
-//        System.out.println("收到消息: " + msg);
+//    public void receive(SysUser sysUser) {
+//        System.out.println("收到消息: " + sysUser);
 //    }
 
 
-
-    // 自动 ACK
-    @RabbitListener(id = "demoContainer", queues = "demo.queue", autoStartup = "false" )
-    public void receive(SysUser sysUser) {
-        System.out.println("收到消息: " + sysUser);
-    }
-
 //    // 手动 ACK
 //    @RabbitListener(id = "demoContainer", queues = "demo.queue", ackMode = "MANUAL")
 //    public void receive(Message message, Channel channel) throws IOException, InterruptedException {
 //        try {
-//            // 1. 模拟耗时业务(2 秒)
-//            Thread.sleep(10000);
+//            // 1. 模拟耗时业务
+//            Thread.sleep(8000);
 //
 //            // 2. 业务处理
 //            String body = new String(message.getBody(), StandardCharsets.UTF_8);
-//            System.out.println("处理消息: " + body);
+//            System.out.println("处理消息 (8s): " + body);
 //
 //            // 3. 手动确认(deliveryTag + 是否批量)
 //            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 //        } catch (Exception e) {
-//            System.err.println("处理失败: " + e.getMessage());
+//            System.err.println("处理失败 (10s): " + e.getMessage());
 //            // 拒绝并重新入队
-//            channel.basicNack(
-//                    message.getMessageProperties().getDeliveryTag(),
-//                    false,   // 不批量
-//                    true     // 重新入队
-//            );
+//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
 //        }
 //    }
 
-    // 死信队列
-//    @Bean
-//    public Queue demoQueue() {
-//        return QueueBuilder.durable("dlx.queue")
-//                .deadLetterExchange("dlx")
-//                .deadLetterRoutingKey("demo.dlx")
-//                .ttl(5000)                 // 可选:消息 10 秒过期
-//                .maxLength(1000)      // 可选:队列长度限制
-//                .build();
-//    }
-    // 和监听正常队列写法一模一样
-//    @RabbitListener(queues = "dlx.queue")
-//    public void handleDlx(Message message, Channel channel) throws IOException {
-//        System.out.println("【死信】收到:" + new String(message.getBody()));
-//
-//        // 业务:记录日志 / 重发 / 报警 / 人工补偿
-//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-//    }
+    // 监听死信队列
+    @RabbitListener(id = "dlxContainer", queues = "dlx.queue", ackMode = "MANUAL")
+    public void handleDlx(Message message, Channel channel) throws IOException {
+        System.out.println("【死信】收到:" + new String(message.getBody()));
 
+        // 业务:记录日志 / 重发 / 报警 / 人工补偿
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+    }
 
 
     // 4. 并发消费(一条队列多线程)

+ 47 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/DemoRabbitConfig.java

@@ -0,0 +1,47 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import org.springframework.amqp.core.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * 非懒加载,在项目启动时创建 RabbitMQ 队列、交换机、绑定关系
+ */
+@Configuration
+@Lazy(false)
+public class DemoRabbitConfig {
+
+    @Autowired
+    private RabbitDlxConfig rabbitDlxConfig;
+
+    public static final String EXCHANGE   = "order.exchange";
+    public static final String QUEUE    = "order.queue";
+    public static final String ROUTING_KEY = "order.create";
+
+    // 交换机
+    @Bean
+    public DirectExchange demoExchange() {
+        return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
+    }
+
+    // 队列 (5s后过期,1000条后过期)
+    @Bean
+    public Queue demoQueue() {
+        return QueueBuilder.durable(QUEUE)
+                .ttl(5000)                    // 5s
+                .maxLength(1000)        // 1000条
+                .deadLetterExchange(rabbitDlxConfig.DLX_EX)
+                .deadLetterRoutingKey(rabbitDlxConfig.DLX_RK)
+                .build();
+    }
+
+    // 队列绑定交换机
+    @Bean
+    public Binding demoBinding() {
+        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(ROUTING_KEY);
+    }
+
+
+}

+ 4 - 3
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitConfig.java

@@ -5,12 +5,13 @@ import org.springframework.amqp.support.converter.MessageConverter;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+/**
+ * RabbitMQ 通用配置
+ */
 @Configuration
 public class RabbitConfig {
 
-    /**
-     * 保证实体类可以通过 Jackson 序列化 / 反序列化 传参
-     */
+    // 允许 [实体类] 可以通过 Jackson 序列化 / 反序列化 传参
     @Bean
     public MessageConverter messageConverter() {
         return new Jackson2JsonMessageConverter();

+ 30 - 0
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitDlxConfig.java

@@ -0,0 +1,30 @@
+package com.backendsys.modules.common.config.rabbitmq;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * 死信队列
+ * 在项目启动时创建 (RabbitMQ) 队列、交换机、绑定关系
+ */
+@Configuration
+@Lazy(false)
+public class RabbitDlxConfig {
+
+    /* === 死信交换机/队列 === */
+    public static final String DLX_EX = "dlx";
+    public static final String DLX_Q  = "dlx.queue";
+    public static final String DLX_RK = "dlx.routekey";
+
+    @Bean
+    public DirectExchange dlxExchange() { return ExchangeBuilder.directExchange(DLX_EX).durable(true).build(); }
+
+    @Bean
+    public Queue dlxQueue() { return QueueBuilder.durable(DLX_Q).build(); }
+
+    @Bean
+    public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_RK); }
+
+}

+ 26 - 6
src/main/java/com/backendsys/modules/common/config/rabbitmq/RabbitStartupRunner.java

@@ -1,24 +1,44 @@
 package com.backendsys.modules.common.config.rabbitmq;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
 import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
+/**
+ * 手动启动监听器
+ * 由于 RabbitMQ 的监听器默认是懒加载,所以需要手动启动监听器
+ */
+@Slf4j
 @Component
 @RequiredArgsConstructor
 public class RabbitStartupRunner implements CommandLineRunner {
 
-    // 注入注册中心
     private final RabbitListenerEndpointRegistry registry;
 
     @Override
     public void run(String... args) {
-        MessageListenerContainer container = registry.getListenerContainer("demoContainer");
-        if (container != null && !container.isRunning()) {
-            container.start();     // 关键:真正启动监听
-            System.out.println("-- RabbitListener 已手动启动 --");
-        }
+        List<String> MANUAL_START_IDS = List.of("demoContainer", "dlxContainer");
+        MANUAL_START_IDS.forEach(id -> {
+            MessageListenerContainer c = registry.getListenerContainer(id);
+            if (c != null && !c.isRunning()) {
+                c.start();
+                log.info("-- RabbitListener '{}' 已手动启动 --", id);
+            }
+        });
     }
+
+
+//    public void run(String... args) {
+//        MessageListenerContainer container = registry.getListenerContainer("demoContainer");
+//        if (container != null && !container.isRunning()) {
+//            container.start();     // 关键:真正启动监听
+//            System.out.println("-- RabbitListener 已手动启动 --");
+//        }
+//    }
+
 }