tsurumure пре 1 месец
родитељ
комит
2628b964e2

+ 43 - 46
src/main/java/com/backendsys/modules/queue/service/impl/TaskServiceImpl.java → src/main/java/com/backendsys/modules/queue/service/impl/TaskServiceImpl_bak.java

@@ -1,38 +1,22 @@
 package com.backendsys.modules.queue.service.impl;
 
-import cn.hutool.core.util.ObjectUtil;
-import cn.hutool.core.util.StrUtil;
-import com.backendsys.modules.queue.entity.Entire;
 import com.backendsys.modules.queue.service.TaskService;
-import org.springframework.data.domain.Range;
-import jakarta.annotation.PostConstruct;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.domain.Range;
 import org.springframework.data.redis.RedisSystemException;
 import org.springframework.data.redis.connection.Limit;
 import org.springframework.data.redis.connection.stream.*;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.StreamOperations;
-import org.springframework.data.redis.stream.StreamListener;
-import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 import org.springframework.stereotype.Service;
 
-import java.time.Duration;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-
-import static com.volcengine.model.tls.Const.STREAM;
 
 /**
  * 任务队列服务
  */
 @Service
-public class TaskServiceImpl implements TaskService, StreamListener<String, MapRecord<String,String,String>> {
-
-    @Autowired
-    private StreamMessageListenerContainer<String, MapRecord<String,String,String>> container;
+public class TaskServiceImpl_bak implements TaskService {
 
     @Autowired
     private RedisTemplate redisTemplate;
@@ -49,8 +33,6 @@ public class TaskServiceImpl implements TaskService, StreamListener<String, MapR
                 throw e; // 忽略"组已存在"错误
             }
         }
-        container.receive(Consumer.from(group_name, "my-consumer"),
-                StreamOffset.create(STREAM, ReadOffset.lastConsumed()), this);
     }
 
     /**
@@ -73,29 +55,47 @@ public class TaskServiceImpl implements TaskService, StreamListener<String, MapR
     public List<MapRecord<String, String, String>> getTaskPaddingList(String stream_key) {
         String group_name = stream_key + ":group";
 
-        // 1. 先确保流和消费者组存在,只需建一次
-        String stream = "my-stream";
-        String group = "my-group";
-        try {   // 如果组已存在会抛异常,忽略即可
-            redisTemplate.opsForStream().createGroup(stream, ReadOffset.latest(), group);
-        } catch (Exception ignored) {}
-
-        // 2. 轮询消费
-        StreamOperations<String, String, String> ops = redisTemplate.opsForStream();
-        while (true) {
-            List<MapRecord<String, String, String>> list = ops.read(
-                    Consumer.from(group, "sync-consumer"),
-                    StreamReadOptions.empty().count(5).block(Duration.ofSeconds(3)),
-                    StreamOffset.create(stream, ReadOffset.lastConsumed())
-            );
-            if (list.isEmpty()) continue;
-
-            for (MapRecord<String, String, String> r : list) {
-                System.out.println("收到消息 id=" + r.getId() + " body=" + r.getValue());
-                // 3. ack
-                redisTemplate.opsForStream().acknowledge(stream, group, r.getId());
-            }
-        }
+//        /* 1. 已读未 ACK 的 Pending 消息 */
+//        List<MapRecord<String, String, String>> pendingRecords = Collections.emptyList();
+//        PendingMessagesSummary summary = redisTemplate.opsForStream().pending(stream_key, group_name);
+//        System.out.println("total = " + summary.getTotalPendingMessages());
+//        if (summary.getTotalPendingMessages() > 0) {
+//            Range<String> idRange = summary.getIdRange();
+//            System.out.println("idRange = " + idRange);
+//            pendingRecords = redisTemplate.opsForStream().range(stream_key, idRange);
+//            System.out.println("pendingRecords = " + pendingRecords);
+//        }
+//        return pendingRecords;
+
+        StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);
+        System.out.println("groups = " + groups);
+
+        String lastDelivered = groups.stream()
+                .filter(g -> group_name.equals(g.groupName()))
+                .findFirst()
+                .map(StreamInfo.XInfoGroup::lastDeliveredId)
+                .orElse("0-0");
+        System.out.println("lastDelivered = " + lastDelivered);
+
+        List<MapRecord<String, String, String>> unread =
+                redisTemplate.opsForStream()
+                        .read(Consumer.from(group_name, "my-consumer"),
+                                StreamOffset.create(stream_key,
+                                        ReadOffset.from(lastDelivered)));
+        System.out.println("unread = " + unread);
+        return unread;
+
+//        PendingMessagesSummary summary = redisTemplate.opsForStream().pending(stream_key, group_name);
+//        Range<String> idRange = summary.getIdRange();
+//
+//        long total = summary.getTotalPendingMessages();
+//        String minId = idRange.getLowerBound().getValue().orElse(null);
+//        String maxId = idRange.getUpperBound().getValue().orElse(null);
+//        System.out.println("total = " + total + ", minId: " + minId + ", maxId: " + maxId);
+//
+//        // 用得到的范围再去查询完整消息
+//        List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(stream_key, idRange);
+//        return records;
 
     }
 
@@ -141,9 +141,6 @@ public class TaskServiceImpl implements TaskService, StreamListener<String, MapR
         redisTemplate.opsForStream().delete(stream_key, task_id);
     }
 
-    @Override
-    public void onMessage(MapRecord<String, String, String> message) {
-    }
 
 
 //        // 1. 安全获取消息

+ 0 - 199
src/main/java/com/backendsys/modules/queue/service/impl/__TaskServiceImpl_bak.java

@@ -1,199 +0,0 @@
-//package com.backendsys.modules.queue.service.impl;
-//
-//import com.backendsys.modules.queue.service.TaskService;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.data.domain.Range;
-//import org.springframework.data.redis.RedisSystemException;
-//import org.springframework.data.redis.connection.Limit;
-//import org.springframework.data.redis.connection.stream.*;
-//import org.springframework.data.redis.core.RedisTemplate;
-//import org.springframework.stereotype.Service;
-//
-//import java.util.List;
-//import java.util.Map;
-//
-///**
-// * 任务队列服务
-// */
-//@Service
-//public class __TaskServiceImpl_bak implements TaskService {
-//
-//    @Autowired
-//    private RedisTemplate redisTemplate;
-//
-//    /**
-//     * 初始化消费者组
-//     */
-//    public void initConsumerGroup(String stream_key, String group_name) {
-//        System.out.println("initConsumerGroup: [stream_key = " + stream_key + ", group_name = " + group_name + "]");
-//        try {
-//            redisTemplate.opsForStream().createGroup(stream_key, group_name);
-//        } catch (RedisSystemException e) {
-//            if (!e.getCause().getMessage().contains("BUSYGROUP")) {
-//                throw e; // 忽略"组已存在"错误
-//            }
-//        }
-//    }
-//
-//    /**
-//     * 获取任务队列 (显示前5条)
-//     */
-//    @Override
-//    public List<MapRecord<String, String, String>> getTaskList(String stream_key) {
-//        return redisTemplate.opsForStream()
-//                .reverseRange(stream_key, Range.unbounded(), Limit.limit().count(5));
-//    }
-//
-//
-//
-//
-//
-//    /**
-//     * 获取任务队列 (未消费)
-//     */
-//    @Override
-//    public List<MapRecord<String, String, String>> getTaskPaddingList(String stream_key) {
-//        String group_name = stream_key + ":group";
-//
-////        /* 1. 已读未 ACK 的 Pending 消息 */
-////        List<MapRecord<String, String, String>> pendingRecords = Collections.emptyList();
-////        PendingMessagesSummary summary = redisTemplate.opsForStream().pending(stream_key, group_name);
-////        System.out.println("total = " + summary.getTotalPendingMessages());
-////        if (summary.getTotalPendingMessages() > 0) {
-////            Range<String> idRange = summary.getIdRange();
-////            System.out.println("idRange = " + idRange);
-////            pendingRecords = redisTemplate.opsForStream().range(stream_key, idRange);
-////            System.out.println("pendingRecords = " + pendingRecords);
-////        }
-////        return pendingRecords;
-//
-//        StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(stream_key);
-//        System.out.println("groups = " + groups);
-//
-//        String lastDelivered = groups.stream()
-//                .filter(g -> group_name.equals(g.groupName()))
-//                .findFirst()
-//                .map(StreamInfo.XInfoGroup::lastDeliveredId)
-//                .orElse("0-0");
-//        System.out.println("lastDelivered = " + lastDelivered);
-//
-//        List<MapRecord<String, String, String>> unread =
-//                redisTemplate.opsForStream()
-//                        .read(Consumer.from(group_name, "my-consumer"),
-//                                StreamOffset.create(stream_key,
-//                                        ReadOffset.from(lastDelivered)));
-//        System.out.println("unread = " + unread);
-//        return unread;
-//
-////        PendingMessagesSummary summary = redisTemplate.opsForStream().pending(stream_key, group_name);
-////        Range<String> idRange = summary.getIdRange();
-////
-////        long total = summary.getTotalPendingMessages();
-////        String minId = idRange.getLowerBound().getValue().orElse(null);
-////        String maxId = idRange.getUpperBound().getValue().orElse(null);
-////        System.out.println("total = " + total + ", minId: " + minId + ", maxId: " + maxId);
-////
-////        // 用得到的范围再去查询完整消息
-////        List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(stream_key, idRange);
-////        return records;
-//
-//    }
-//
-//    /**
-//     * 消费任务 (不移除)
-//     */
-//    @Override
-//    public void consumeTask(String stream_key, String task_id) {
-//        String group_name = stream_key + ":group";
-//
-//        // 用组里任意消费者读一条(不 ACK)
-//        List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
-//                .read(Consumer.from(group_name, "my-consumer"), StreamOffset.create(stream_key, ReadOffset.from(">")));
-//        System.out.println("records = " + records);
-//
-//
-////        Long acknowledged = redisTemplate.opsForStream().acknowledge(stream_key, stream_key + ":group", task_id);
-////        System.out.println("acknowledged = " + acknowledged);
-//    }
-//
-//
-//
-//
-//
-//
-//    /**
-//     * 添加任务
-//     */
-//    @Override
-//    public RecordId addTask(String stream_key, Map<String, Object> task_data) {
-//        ObjectRecord<String, Map<String, Object>> task = StreamRecords.newRecord()
-//            .ofObject(task_data)
-//            .withStreamKey(stream_key);
-//        // 添加任务
-//        return redisTemplate.opsForStream().add(task);
-//    }
-//
-//    /**
-//     * 移除任务
-//     */
-//    @Override
-//    public void removeTask(String stream_key, String task_id) {
-//        redisTemplate.opsForStream().delete(stream_key, task_id);
-//    }
-//
-//
-//
-////        // 1. 安全获取消息
-////        List<MapRecord<String, String, String>> task_list = redisTemplate.opsForStream()
-////                .read(Consumer.from(stream_key + ":group", "consumer-1"),
-////                        StreamReadOptions.empty().count(1),
-////                        StreamOffset.create(stream_key, ReadOffset.lastConsumed()));
-//
-////        System.out.println("(consumeTask) task_list = " + task_list);
-//
-////        if (task_list != null && !task_list.isEmpty()) {
-////            // 2. 处理消息
-////            MapRecord<String, String, String> record = task_list.get(0);
-////            try {
-//                // 实际业务处理(如调用文生图API)
-//                // processTask(record.getValue());
-//
-////                // 3. 确认消息(ACK)
-////                redisTemplate.opsForStream().acknowledge(stream_key, stream_key + ":group", record.getId());
-//
-//                // 在ACK后追加删除操作
-////                redisTemplate.opsForStream().delete(stream_key, record.getId());
-//
-////            } catch (Exception e) {
-////                System.out.println("任务处理失败: " + record.getId() + ", " + e.getMessage());
-////            }
-////        }
-//
-////    }
-//
-//    /**
-//     * 获取任务队列
-//     */
-////    @Override
-////    public List<MapRecord<String, String, String>> getTaskList(String stream_key) {
-////        return getTaskList(stream_key, null);
-////    }
-////    @Override
-////    public List<MapRecord<String, String, String>> getTaskList(String stream_key, Entire entire) {
-////
-////        // 1. 读取全部数据
-////        List<MapRecord<String, String, String>> task_list = redisTemplate.opsForStream().read(StreamOffset.fromStart(stream_key));
-////
-////        // 2. 根据 entire 过滤
-////        if (entire != null) {
-////            if (StrUtil.isNotEmpty(entire.getKey()) && ObjectUtil.isNotEmpty(entire.getValue())) {
-////                task_list.stream().filter(task -> {
-////                    return entire.getValue().equals(task.getValue().get(entire.getKey()));
-////                }).collect(Collectors.toList());
-////            }
-////        }
-////        return task_list;
-////    }
-//
-//
-//}